This is an automated email from the ASF dual-hosted git repository.
yanxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 1ae0ba6 PHOENIX-5601 Add a new Coprocessor - ViewTTLAware Coprocessor
1ae0ba6 is described below
commit 1ae0ba644f8aa188adfd93af96ae3df74ec43f3d
Author: Jacob Isaac <[email protected]>
AuthorDate: Fri Nov 15 08:58:15 2019 -0800
PHOENIX-5601 Add a new Coprocessor - ViewTTLAware Coprocessor
Signed-off-by: Xinyi Yan <[email protected]>
---
.../java/org/apache/phoenix/end2end/ViewTTLIT.java | 1054 +++++++++++++++++++-
.../phoenix/compile/ServerBuildIndexCompiler.java | 3 +-
.../coprocessor/BaseScannerRegionObserver.java | 6 +-
.../coprocessor/TTLAwareRegionObserver.java | 328 ++++++
.../phoenix/iterate/TableResultIterator.java | 3 +-
.../phoenix/query/ConnectionQueryServicesImpl.java | 10 +
.../java/org/apache/phoenix/util/IndexUtil.java | 100 +-
.../java/org/apache/phoenix/util/ScanUtil.java | 147 +++
.../apache/phoenix/query/PhoenixTestBuilder.java | 338 ++++++-
9 files changed, 1813 insertions(+), 176 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
index ee77b33..18a2fa8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
@@ -18,6 +18,8 @@
package org.apache.phoenix.end2end;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
@@ -31,19 +33,32 @@ import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder;
+import org.apache.phoenix.query.PhoenixTestBuilder.BasicDataReader;
+import org.apache.phoenix.query.PhoenixTestBuilder.BasicDataWriter;
+import org.apache.phoenix.query.PhoenixTestBuilder.DataReader;
+import org.apache.phoenix.query.PhoenixTestBuilder.DataWriter;
+import org.apache.phoenix.query.PhoenixTestBuilder.DataSupplier;
+import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.OtherOptions;
import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TableOptions;
+import
org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions;
+import
org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions;
import
org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewOptions;
import
org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
@@ -51,20 +66,26 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.MAX_ROWS;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ViewTTLIT extends ParallelStatsDisabledIT {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ViewTTLIT.class);
- private static final String ORG_ID_FMT = "00D0x000%s";
- private static final String ID_FMT = "00A0y000%07d";
private static final String VIEW_TTL_HEADER_SQL = "SELECT VIEW_TTL FROM
SYSTEM.CATALOG "
+ "WHERE %s AND TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND
TABLE_TYPE = '%s'";
private static final String ALTER_VIEW_TTL_SQL = "ALTER VIEW %s.%s set
VIEW_TTL=%d";
+ private static final int DEFAULT_NUM_ROWS = 5;
// Scans the HBase rows directly for the view ttl related header rows
column and asserts
private void assertViewHeaderRowsHaveViewTTLRelatedCells(String
schemaName, long minTimestamp,
@@ -96,7 +117,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
assertEquals(String.format("Expected rows do not match for table =
%s at timestamp %d",
Bytes.toString(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES),
minTimestamp), expectedRows, numMatchingRows);
}
-
}
private void assertSyscatHaveViewTTLRelatedColumns(String tenantId, String
schemaName, String tableName, String tableType, long ttlValueExpected)
@@ -119,7 +139,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
return name.replace("\"", "");
}
-
/**
* -----------------
* Test methods
@@ -134,7 +153,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
// Define the test schema.
// 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK
=> (ORG_ID, KP)
// 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK =>
(ID)
- final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new
PhoenixTestBuilder.SchemaBuilder(getUrl());
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
schemaBuilder
.withTableDefaults()
.withGlobalViewDefaults()
@@ -145,15 +164,13 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(),
startTime, true, 2);
}
-
-
@Test
public void testViewTTLWithTableLevelTTLFails() throws Exception {
// Define the test schema.
// 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK
=> (ORG_ID, KP)
// 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK =>
(ZID)
- final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new
PhoenixTestBuilder.SchemaBuilder(getUrl());
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
TableOptions tableOptions = TableOptions.withDefaults();
tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true,TTL=100");
@@ -177,7 +194,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
// Define the test schema.
// 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK
=> (ORG_ID, KP)
// 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK =>
(ZID)
- final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new
PhoenixTestBuilder.SchemaBuilder(getUrl());
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
TableOptions tableOptions = TableOptions.withDefaults();
tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
@@ -203,7 +220,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
// Define the test schema.
// 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK
=> (ORG_ID, KP)
// 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK =>
(ZID)
- final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new
PhoenixTestBuilder.SchemaBuilder(getUrl());
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
TableOptions tableOptions = TableOptions.withDefaults();
tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
@@ -238,18 +255,18 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
// 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK
=> (ORG_ID, KP)
// 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK =>
(ID)
// 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK =>
(ZID)
- final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new
PhoenixTestBuilder.SchemaBuilder(getUrl());
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
TableOptions tableOptions = TableOptions.withDefaults();
tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
- PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions
- globalViewOptions =
PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions.withDefaults();
+ SchemaBuilder.GlobalViewOptions
+ globalViewOptions =
SchemaBuilder.GlobalViewOptions.withDefaults();
globalViewOptions.setTableProps("VIEW_TTL=300000");
- PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions
+ SchemaBuilder.GlobalViewIndexOptions
globalViewIndexOptions =
-
PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+ SchemaBuilder.GlobalViewIndexOptions.withDefaults();
globalViewIndexOptions.setLocal(false);
TenantViewOptions tenantViewWithOverrideOptions =
TenantViewOptions.withDefaults();
@@ -274,7 +291,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(),
startTime, false, 4);
assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName,
PTableType.VIEW.getSerializedValue(), 300000);
assertSyscatHaveViewTTLRelatedColumns("", schemaName,
indexOnGlobalViewName, PTableType.INDEX.getSerializedValue(), 300000);
- // Since the VIEW_TTL property values are not being overriden, we
expect the TTL value to be different from the global view.
+ // Since the VIEW_TTL property values are being overriden, we expect
the TTL value to be different from the global view.
assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName,
tenantViewName, PTableType.VIEW.getSerializedValue(), 1000);
assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName,
indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 1000);
@@ -315,7 +332,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
// Define the test schema.
// 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK
=> (ORG_ID, KP)
// 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK =>
(ZID)
- final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new
PhoenixTestBuilder.SchemaBuilder(getUrl());
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
TableOptions tableOptions = TableOptions.withDefaults();
tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
@@ -341,7 +358,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
// Since the VIEW_TTL property values are not being overriden, we
expect the TTL value to be different from the global view.
assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName,
tenantViewName, PTableType.VIEW.getSerializedValue(), 0);
assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName,
indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 0);
-
}
@Test
@@ -351,7 +367,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
// Define the test schema.
// 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK
=> (ORG_ID, KP)
// 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK =>
(ZID)
- final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new
PhoenixTestBuilder.SchemaBuilder(getUrl());
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
TableOptions tableOptions = TableOptions.withDefaults();
tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
@@ -391,7 +407,997 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
// Since the VIEW_TTL property values are not being overriden, we
expect the TTL value to be different from the global view.
assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName,
tenantViewName, PTableType.VIEW.getSerializedValue(), 1000);
assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName,
indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 1000);
+ }
+
+ @Test
+ public void testViewTTLForLevelTwoViewWithNoIndexes() throws Exception {
+ long startTime = System.currentTimeMillis();
+
+ // Define the test schema.
+ // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK
=> (ORG_ID, KP)
+ // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK =>
(ID)
+ // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK =>
(ZID)
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+ SchemaBuilder.GlobalViewOptions
+ globalViewOptions =
SchemaBuilder.GlobalViewOptions.withDefaults();
+ globalViewOptions.setTableProps("VIEW_TTL=300000");
+
+ TenantViewOptions tenantViewWithOverrideOptions =
TenantViewOptions.withDefaults();
+ tenantViewWithOverrideOptions.setTableProps("VIEW_TTL=10000");
+ schemaBuilder
+ .withTableOptions(tableOptions)
+ .withGlobalViewOptions(globalViewOptions)
+ .withTenantViewOptions(tenantViewWithOverrideOptions)
+ .buildWithNewTenant();
+
+ String tenantId = schemaBuilder.getDataOptions().getTenantId();
+ String schemaName =
stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+ String globalViewName =
stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName()));
+ String tenantViewName =
stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+
+ // Expected 2 rows - one for GlobalView, one for TenantView each.
+ // Since the VIEW_TTL property values are being set, we expect the
view header columns to show up in regular scans too.
+
assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(),
startTime, false, 2);
+ assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName,
PTableType.VIEW.getSerializedValue(), 300000);
+ // Since the VIEW_TTL property values are not being overriden, we
expect the TTL value to be different from the global view.
+ assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName,
tenantViewName, PTableType.VIEW.getSerializedValue(), 10000);
+
+ // Without override
+ startTime = System.currentTimeMillis();
+
+ TenantViewOptions tenantViewWithoutOverrideOptions =
TenantViewOptions.withDefaults();
+ schemaBuilder
+ .withTableOptions(tableOptions)
+ .withGlobalViewOptions(globalViewOptions)
+ .withTenantViewOptions(tenantViewWithoutOverrideOptions)
+ .buildWithNewTenant();
+
+ tenantId = schemaBuilder.getDataOptions().getTenantId();
+ schemaName =
stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+ globalViewName =
stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName()));
+ tenantViewName =
stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+
+ // Expected 1 rows - one for TenantView each.
+ // Since the VIEW_TTL property values are being set, we expect the
view header columns to show up in regular scans too.
+
assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(),
startTime, false, 1);
+ assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName,
PTableType.VIEW.getSerializedValue(), 300000);
+ // Since the VIEW_TTL property values are not being overriden, we
expect the TTL value to be same as the global view.
+ assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName,
tenantViewName, PTableType.VIEW.getSerializedValue(), 300000);
+ }
+
+ @Test
+ public void testWithTenantViewAndNoGlobalView() throws Exception {
+
+ long viewTTL = 10000;
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.getTableColumns().clear();
+ tableOptions.getTableColumnTypes().clear();
+
+ TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+ tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+ // Define the test schema.
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+ schemaBuilder
+ .withTableOptions(tableOptions)
+ .withTenantViewOptions(tenantViewOptions)
+ .build();
+
+ // Define the test data.
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String zid = String.format("00A0y000%07d", rowIndex);
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ return Lists.newArrayList(new Object[] { zid, col7, col8, col9
});
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ DataReader dataReader = new BasicDataReader();
+
+ List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8",
"COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("ZID");
+ String
+ tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions()
+ .getTenantId();
+ try (Connection writeConnection = DriverManager
+ .getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ dataReader.setValidationColumns(columns);
+ dataReader.setRowKeyColumns(rowKeyColumns);
+ dataReader.setDML(String.format("SELECT %s from %s",
+ Joiner.on(",").join(columns),
+ schemaBuilder.getEntityTenantViewName()));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ upsertDataAndRunValidations(viewTTL, DEFAULT_NUM_ROWS, dataWriter,
+ dataReader, schemaBuilder);
+ }
+ }
+
+ @Test public void testWithSQLUsingIndex() throws Exception {
+
+ long viewTTL = 10000;
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.getTableColumns().clear();
+ tableOptions.getTableColumnTypes().clear();
+
+ GlobalViewOptions
+ globalViewOptions =
+ SchemaBuilder.GlobalViewOptions.withDefaults();
+ globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+ GlobalViewIndexOptions
+ globalViewIndexOptions =
+ SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+ globalViewIndexOptions.setLocal(false);
+
+ TenantViewOptions tenantViewOptions = new TenantViewOptions();
+ tenantViewOptions
+ .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+ tenantViewOptions
+ .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR",
"VARCHAR", "VARCHAR"));
+ tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+ OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTableCFs(Lists.newArrayList((String) null, null, null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setGlobalViewCFs(Lists.newArrayList((String) null, null,
null));
+
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
null, null, null));
+
+ // Define the test schema.
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+ schemaBuilder
+ .withTableOptions(tableOptions)
+ .withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+ .withTenantViewOptions(tenantViewOptions)
+ .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+ .build();
+
+
+ // Define the test data.
+ final List<String> outerCol4s = Lists.newArrayList();
+ DataSupplier dataSupplier = new DataSupplier() {
+ String col4ForWhereClause;
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String id = String.format("00A0y000%07d", rowIndex);
+ String zid = String.format("00B0y000%07d", rowIndex);
+ String
+ col4 =
+ String.format("d%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ // Store the col4 data to be used later in a where clause
+ outerCol4s.add(col4);
+ String
+ col5 =
+ String.format("e%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col6 =
+ String.format("f%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col7 =
+ String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col8 =
+ String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col9 =
+ String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists.newArrayList(
+ new Object[] { id, zid, col4, col5, col6, col7,
+ col8, col9 });
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ DataReader dataReader = new BasicDataReader();
+
+ List<String> columns = Lists.newArrayList("ID", "ZID",
+ "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("COL6");
+ String
+ tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions()
+ .getTenantId();
+ try (Connection writeConnection = DriverManager
+ .getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Upsert data for validation
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ dataReader.setValidationColumns(rowKeyColumns);
+ dataReader.setRowKeyColumns(rowKeyColumns);
+ dataReader.setDML(String.format("SELECT col6 from %s where col4 =
'%s'",
+ schemaBuilder.getEntityTenantViewName(),
outerCol4s.get(1)));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader,
schemaBuilder);
+ }
}
+ @Test public void testWithVariousSQLs() throws Exception {
+
+ long viewTTL = 10000;
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.getTableColumns().clear();
+ tableOptions.getTableColumnTypes().clear();
+
+ GlobalViewOptions
+ globalViewOptions =
+ SchemaBuilder.GlobalViewOptions.withDefaults();
+ globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+ GlobalViewIndexOptions
+ globalViewIndexOptions =
+ SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+ globalViewIndexOptions.setLocal(false);
+
+ TenantViewOptions tenantViewOptions = new TenantViewOptions();
+ tenantViewOptions
+ .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+ tenantViewOptions
+ .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR",
"VARCHAR", "VARCHAR"));
+
+ tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+ OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTableCFs(Lists.newArrayList((String) null, null, null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setGlobalViewCFs(Lists.newArrayList((String) null, null,
null));
+
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
null, null, null));
+
+ // Define the test schema.
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+ schemaBuilder
+ .withTableOptions(tableOptions)
+ .withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+ .withTenantViewOptions(tenantViewOptions)
+ .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+ .build();
+
+
+ // Define the test data.
+ final String groupById = String.format("00A0y000%07d", 0);
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String zid = String.format("00B0y000%07d", rowIndex);
+ String
+ col4 =
+ String.format("d%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col5 =
+ String.format("e%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col6 =
+ String.format("f%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col7 =
+ String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col8 =
+ String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col9 =
+ String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists.newArrayList(
+ new Object[] { groupById, zid, col4, col5, col6, col7,
+ col8, col9 });
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ List<String> columns = Lists.newArrayList("ID", "ZID",
+ "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+ String
+ tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions()
+ .getTenantId();
+ try (Connection writeConnection = DriverManager
+ .getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ // Case : count(1) sql
+ DataReader dataReader = new BasicDataReader();
+ dataReader.setValidationColumns(Arrays.asList("num_rows"));
+ dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+ dataReader.setDML(String.format("SELECT count(1) as num_rows from
%s HAVING count(1) > 0",
+ schemaBuilder.getEntityTenantViewName()));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader,
schemaBuilder);
+
+ // Case : group by sql
+ dataReader.setValidationColumns(Arrays.asList("num_rows"));
+ dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+ dataReader.setDML(String.format("SELECT count(1) as num_rows from
%s GROUP BY ID HAVING count(1) > 0",
+ schemaBuilder.getEntityTenantViewName(),
+ groupById));
+
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader,
schemaBuilder);
+ }
+ }
+
+ @Test
+ public void testWithTenantViewAndGlobalViewAndVariousOptions() throws
Exception {
+ long viewTTL = 10000;
+
+ // Define the test schema
+ TableOptions tableOptions = TableOptions.withDefaults();
+
+ GlobalViewOptions
+ globalViewOptions =
+ SchemaBuilder.GlobalViewOptions.withDefaults();
+ globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+ GlobalViewIndexOptions
+ globalViewIndexOptions =
+ GlobalViewIndexOptions.withDefaults();
+
+ TenantViewOptions tenantViewOptions = new TenantViewOptions();
+ tenantViewOptions
+ .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+ tenantViewOptions
+ .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR",
"VARCHAR", "VARCHAR"));
+ tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+ TenantViewIndexOptions
+ tenantViewIndexOptions =
+ TenantViewIndexOptions.withDefaults();
+
+ OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTableCFs(Lists.newArrayList((String) null, null, null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setGlobalViewCFs(Lists.newArrayList((String) null, null,
null));
+
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
null, null, null));
+
+ for (String additionalProps : Lists
+ .newArrayList("COLUMN_ENCODED_BYTES=0",
"DEFAULT_COLUMN_FAMILY='0'")) {
+
+ StringBuilder withTableProps = new StringBuilder();
+
withTableProps.append("MULTI_TENANT=true,").append(additionalProps);
+
+ for (boolean isGlobalViewLocal : Lists.newArrayList(true, false)) {
+ for (boolean isTenantViewLocal : Lists.newArrayList(true,
false)) {
+
+ tableOptions.setTableProps(withTableProps.toString());
+ globalViewIndexOptions.setLocal(isGlobalViewLocal);
+ tenantViewIndexOptions.setLocal(isTenantViewLocal);
+ OtherOptions otherOptions =
testCaseWhenAllCFMatchAndAllDefault;
+
+ final SchemaBuilder schemaBuilder = new
SchemaBuilder(getUrl());
+ schemaBuilder
+ .withTableOptions(tableOptions)
+ .withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+ .withTenantViewOptions(tenantViewOptions)
+ .withTenantViewIndexOptions(tenantViewIndexOptions)
+
.withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+ .buildWithNewTenant();
+
+ // Define the test data.
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String id = String.format("00A0y000%07d",
rowIndex);
+ String zid = String.format("00B0y000%07d",
rowIndex);
+ String
+ col1 =
+ String.format("a%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col2 =
+ String.format("b%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col3 =
+ String.format("c%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col4 =
+ String.format("d%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col5 =
+ String.format("e%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col6 =
+ String.format("f%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col7 =
+ String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col8 =
+ String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col9 =
+ String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists.newArrayList(
+ new Object[] { id, zid, col1, col2, col3,
col4, col5, col6, col7,
+ col8, col9 });
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ DataReader dataReader = new BasicDataReader();
+
+ List<String> columns = Lists.newArrayList("ID", "ZID",
"COL1", "COL2",
+ "COL3", "COL4", "COL5", "COL6", "COL7", "COL8",
"COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("ID",
"ZID");
+ String
+ tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder
+ .getDataOptions().getTenantId();
+ try (Connection writeConnection = DriverManager
+ .getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ dataReader.setValidationColumns(columns);
+ dataReader.setRowKeyColumns(rowKeyColumns);
+ dataReader.setDML(String.format("SELECT %s from %s",
+ Joiner.on(",").join(columns),
+ schemaBuilder.getEntityTenantViewName()));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ upsertDataAndRunValidations(viewTTL, DEFAULT_NUM_ROWS,
+ dataWriter, dataReader, schemaBuilder);
+ }
+
+
+ long scnTimestamp = System.currentTimeMillis()+viewTTL;
+ // Delete data by simulating expiration.
+ deleteData(schemaBuilder, scnTimestamp);
+
+ // Verify after deleting TTL expired data.
+ Properties props = new Properties();
+ props.setProperty("CurrentSCN",
Long.toString(scnTimestamp));
+ try (Connection readConnection = DriverManager
+ .getConnection(tenantConnectUrl, props)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String,
Object> fetchedData =
+ fetchData(dataReader);
+ assertTrue("Expired rows should not be fetched",
+ fetchedData.rowKeySet().size() == 0);
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testGlobalAndTenantViewTTLInheritance() throws Exception {
+ long globalViewTTL = 300000;
+ long tenantViewTTL = 30000;
+
+ // Define the test schema.
+ // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK
=> (ORG_ID, KP)
+ // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK =>
(ID)
+ // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK =>
(ZID)
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+ SchemaBuilder.GlobalViewOptions
+ globalViewOptions =
SchemaBuilder.GlobalViewOptions.withDefaults();
+ globalViewOptions.setTableProps(String.format("VIEW_TTL=%d",
globalViewTTL));
+
+ SchemaBuilder.GlobalViewIndexOptions
+ globalViewIndexOptions =
+ SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+ globalViewIndexOptions.setLocal(false);
+
+ TenantViewOptions tenantViewWithOverrideOptions = new
TenantViewOptions();
+ tenantViewWithOverrideOptions
+ .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+ tenantViewWithOverrideOptions
+ .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR",
"VARCHAR", "VARCHAR"));
+
tenantViewWithOverrideOptions.setTableProps(String.format("VIEW_TTL=%d",
tenantViewTTL));
+
+ OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTableCFs(Lists.newArrayList((String) null, null, null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setGlobalViewCFs(Lists.newArrayList((String) null, null,
null));
+
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
null, null, null));
+
+ /**
+ * ************************************************************
+ * Case 1: Build schema with TTL overridden by the tenant view.
+ * TTL for GLOBAL_VIEW - 300000
+ * TTL for TENANT_VIEW - 30000
+ * ************************************************************
+ */
+ schemaBuilder
+ .withTableOptions(tableOptions)
+ .withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+ .withTenantViewOptions(tenantViewWithOverrideOptions)
+ .withTenantViewIndexDefaults()
+ .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+ .buildWithNewTenant();
+
+ // Define the test data.
+ final String id = String.format("00A0y000%07d", 0);
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String zid = String.format("00B0y000%07d", rowIndex);
+ String
+ col1 =
+ String.format("a%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col2 =
+ String.format("b%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col3 =
+ String.format("c%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col4 =
+ String.format("d%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col5 =
+ String.format("e%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col6 =
+ String.format("f%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col7 =
+ String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col8 =
+ String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String
+ col9 =
+ String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists.newArrayList(
+ new Object[] { id, zid, col1, col2, col3, col4, col5,
col6, col7,
+ col8, col9 });
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ List<String> columns = Lists.newArrayList("ID", "ZID",
+ "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", "COL7",
"COL8", "COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+ String
+ tenant1ConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions()
+ .getTenantId();
+ try (Connection writeConnection = DriverManager
+ .getConnection(tenant1ConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ // Case : count(1) sql
+ DataReader dataReader = new BasicDataReader();
+ dataReader.setValidationColumns(Arrays.asList("num_rows"));
+ dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+ dataReader.setDML(String.format("SELECT count(1) as num_rows from
%s HAVING count(1) > 0",
+ schemaBuilder.getEntityTenantViewName()));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ // Use the ttl overridden by the tenant.
+ validateExpiredRowsAreNotReturnedUsingCounts(tenantViewTTL,
dataReader, schemaBuilder);
+ }
+
+ /**
+ * ************************************************************
+ * Case 2: Build schema with TTL NOT overridden by the tenant view.
+ * TTL for GLOBAL_VIEW - 300000
+ * TTL for TENANT_VIEW - 300000
+ * ************************************************************
+ */
+ TenantViewOptions tenantViewWithoutOverrideOptions = new
TenantViewOptions();
+ tenantViewWithoutOverrideOptions
+ .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+ tenantViewWithoutOverrideOptions
+ .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR",
"VARCHAR", "VARCHAR"));
+
+ schemaBuilder
+ .withTableOptions(tableOptions)
+ .withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+ .withTenantViewOptions(tenantViewWithoutOverrideOptions)
+ .withTenantViewIndexDefaults()
+ .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+ .buildWithNewTenant();
+
+ String
+ tenant2ConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions()
+ .getTenantId();
+ try (Connection writeConnection = DriverManager
+ .getConnection(tenant2ConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ // Case : count(1) sql
+ DataReader dataReader = new BasicDataReader();
+ dataReader.setValidationColumns(Arrays.asList("num_rows"));
+ dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+ dataReader.setDML(String.format("SELECT count(1) as num_rows from
%s HAVING count(1) > 0",
+ schemaBuilder.getEntityTenantViewName()));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data exists before ttl expiration.
+ long probeTimestamp = System.currentTimeMillis() + globalViewTTL/2;
+ validateRowsAreNotMaskedUsingCounts(probeTimestamp, dataReader,
schemaBuilder);
+ // Validate data before and after ttl expiration.
+ // Use the global view ttl since that is what the view has
inherited.
+ validateExpiredRowsAreNotReturnedUsingCounts(globalViewTTL,
dataReader, schemaBuilder);
+ }
+ }
+
+ @Test public void testDeleteIfExpiredOnTenantView() throws Exception {
+
+ long viewTTL = 180000;
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.getTableColumns().clear();
+ tableOptions.getTableColumnTypes().clear();
+
+ TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+ tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+ // Define the test schema.
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
schemaBuilder.withTableOptions(tableOptions).withTenantViewOptions(tenantViewOptions)
+ .build();
+
+ // Define the test data.
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String zid = String.format("00A0y000%07d", rowIndex);
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ return Lists.newArrayList(new Object[] { zid, col7, col8, col9
});
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ DataReader dataReader = new BasicDataReader();
+
+ List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8",
"COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("ZID");
+ String
+ tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions()
+ .getTenantId();
+ try (Connection writeConnection =
DriverManager.getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ dataReader.setValidationColumns(columns);
+ dataReader.setRowKeyColumns(rowKeyColumns);
+ dataReader.setDML(String.format("SELECT %s from %s",
Joiner.on(",").join(columns),
+ schemaBuilder.getEntityTenantViewName()));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ upsertDataAndRunValidations(viewTTL, DEFAULT_NUM_ROWS, dataWriter,
+ dataReader, schemaBuilder);
+ }
+
+ long scnTimestamp = System.currentTimeMillis()+viewTTL;
+ // Delete data by simulating expiration.
+ deleteData(schemaBuilder, scnTimestamp);
+
+ // Verify after deleting TTL expired data.
+ Properties props = new Properties();
+ props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+ try (Connection readConnection = DriverManager
+ .getConnection(tenantConnectUrl, props)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
fetchedData =
+ fetchData(dataReader);
+ assertTrue("Expired rows should not be fetched",
+ fetchedData.rowKeySet().size() == 0);
+ }
+ }
+
+ private void upsertDataAndRunValidations(long viewTTL, int numRowsToUpsert,
+ DataWriter dataWriter, DataReader dataReader,
+ SchemaBuilder schemaBuilder)
+ throws IOException, SQLException {
+
+ //Insert for the first time and validate them.
+ validateExpiredRowsAreNotReturnedUsingData(viewTTL,
upsertData(dataWriter, numRowsToUpsert),
+ dataReader,
+ schemaBuilder);
+
+ // Update the above rows and validate the same.
+ validateExpiredRowsAreNotReturnedUsingData(viewTTL,
upsertData(dataWriter, numRowsToUpsert),
+ dataReader,
+ schemaBuilder);
+
+ }
+
+ private void validateExpiredRowsAreNotReturnedUsingCounts(
+ long viewTTL,
+ DataReader dataReader,
+ SchemaBuilder schemaBuilder) throws SQLException {
+
+ String
+ tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions()
+ .getTenantId();
+
+ // Verify before TTL expiration
+ try (Connection readConnection = DriverManager
+ .getConnection(tenantConnectUrl)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
fetchedData =
+ fetchData(dataReader);
+ assertTrue("Fetched data should not be null",
+ fetchedData != null);
+ assertTrue("Rows should exists before expiration",
+ fetchedData.rowKeySet().size() > 0);
+ }
+
+ // Verify after TTL expiration
+ long scnTimestamp = System.currentTimeMillis();
+ Properties props = new Properties();
+ props.setProperty("CurrentSCN",
Long.toString(scnTimestamp+(2*viewTTL)));
+ try (Connection readConnection = DriverManager
+ .getConnection(tenantConnectUrl, props)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
fetchedData =
+ fetchData(dataReader);
+ assertTrue("Fetched data should not be null",
+ fetchedData != null);
+ assertTrue("Expired rows should not be fetched",
+ fetchedData.rowKeySet().size() == 0);
+ }
+ }
+
+ private void validateExpiredRowsAreNotReturnedUsingData(
+ long viewTTL,
+ com.google.common.collect.Table<String, String, Object>
upsertedData,
+ DataReader dataReader,
+ SchemaBuilder schemaBuilder) throws SQLException {
+
+ String
+ tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions()
+ .getTenantId();
+
+ // Verify before TTL expiration
+ try (Connection readConnection = DriverManager
+ .getConnection(tenantConnectUrl)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
fetchedData =
+ fetchData(dataReader);
+ assertTrue("Upserted data should not be null",
+ upsertedData != null);
+ assertTrue("Fetched data should not be null",
+ fetchedData != null);
+
+ verifyRowsBeforeTTLExpiration(upsertedData, fetchedData);
+ }
+
+ // Verify after TTL expiration
+ long scnTimestamp = System.currentTimeMillis();
+ Properties props = new Properties();
+ props.setProperty("CurrentSCN",
Long.toString(scnTimestamp+(2*viewTTL)));
+ try (Connection readConnection = DriverManager
+ .getConnection(tenantConnectUrl, props)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
fetchedData =
+ fetchData(dataReader);
+ assertTrue("Fetched data should not be null",
+ fetchedData != null);
+ assertTrue("Expired rows should not be fetched",
+ fetchedData.rowKeySet().size() == 0);
+ }
+
+ }
+
+ private void validateRowsAreNotMaskedUsingCounts(
+ long probeTimestamp,
+ DataReader dataReader,
+ SchemaBuilder schemaBuilder) throws SQLException {
+
+ String
+ tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions()
+ .getTenantId();
+
+ // Verify rows exists (not masked) at current time
+ try (Connection readConnection = DriverManager
+ .getConnection(tenantConnectUrl)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
fetchedData =
+ fetchData(dataReader);
+ assertTrue("Fetched data should not be null",
+ fetchedData != null);
+ assertTrue("Rows should exists before ttl expiration (now)",
+ fetchedData.rowKeySet().size() > 0);
+ }
+
+ // Verify rows exists (not masked) at probed timestamp
+ Properties props = new Properties();
+ props.setProperty("CurrentSCN", Long.toString(probeTimestamp));
+ try (Connection readConnection = DriverManager
+ .getConnection(tenantConnectUrl, props)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
fetchedData =
+ fetchData(dataReader);
+ assertTrue("Fetched data should not be null",
+ fetchedData != null);
+ assertTrue("Rows should exists before ttl expiration
(probe-timestamp)",
+ fetchedData.rowKeySet().size() > 0);
+ }
+ }
+
+ private void verifyRowsBeforeTTLExpiration(
+ com.google.common.collect.Table<String, String, Object>
upsertedData,
+ com.google.common.collect.Table<String, String, Object>
fetchedData) {
+
+ Set<String> upsertedRowKeys = upsertedData.rowKeySet();
+ Set<String> fetchedRowKeys = fetchedData.rowKeySet();
+ assertTrue("Upserted row keys should not be null",
+ upsertedRowKeys != null);
+ assertTrue("Fetched row keys should not be null",
+ fetchedRowKeys != null);
+ assertTrue("Rows upserted and fetched do not match",
+ upsertedRowKeys.equals(fetchedRowKeys));
+
+
+ Set<String> fetchedCols = fetchedData.columnKeySet();
+ for (String rowKey : fetchedRowKeys) {
+ for (String columnKey : fetchedCols) {
+ Object upsertedValue = upsertedData.get(rowKey, columnKey);
+ Object fetchedValue = fetchedData.get(rowKey, columnKey);
+ assertTrue("Upserted values should not be null",
+ upsertedValue != null);
+ assertTrue("Fetched values should not be null",
+ fetchedValue != null);
+ assertTrue("Values upserted and fetched do not match",
+ upsertedValue.equals(fetchedValue));
+ }
+ }
+ }
+
+ private com.google.common.collect.Table<String, String, Object> upsertData(
+ DataWriter dataWriter, int numRowsToUpsert) throws SQLException {
+ // Upsert rows
+ dataWriter.upsertRows(numRowsToUpsert);
+ return dataWriter.getDataTable();
+ }
+
+ private com.google.common.collect.Table<String, String, Object> fetchData(
+ DataReader dataReader) throws SQLException {
+
+ dataReader.readRows();
+ return dataReader.getDataTable();
+ }
+
+ private void deleteData(SchemaBuilder schemaBuilder, long scnTimestamp)
throws SQLException {
+
+ String viewName = schemaBuilder.getEntityTenantViewName();
+
+ Properties props = new Properties();
+ props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+ String
+ tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions()
+ .getTenantId();
+
+ try (Connection deleteConnection =
DriverManager.getConnection(tenantConnectUrl, props);
+ final Statement statement =
deleteConnection.createStatement()) {
+ deleteConnection.setAutoCommit(true);
+
+ final String deleteIfExpiredStatement = String.format("select *
from %s", viewName);
+ Preconditions.checkNotNull(deleteIfExpiredStatement);
+
+ final PhoenixStatement pstmt =
statement.unwrap(PhoenixStatement.class);
+ // Optimize the query plan so that we potentially use secondary
indexes
+ final QueryPlan queryPlan =
pstmt.optimizeQuery(deleteIfExpiredStatement);
+ final Scan scan = queryPlan.getContext().getScan();
+
+
+ PTable table = PhoenixRuntime.getTable(deleteConnection,
schemaBuilder.getDataOptions().getTenantId(), viewName);
+
+ byte[] emptyColumnFamilyName =
SchemaUtil.getEmptyColumnFamily(table);
+ byte[]
+ emptyColumnName =
+ table.getEncodingScheme() ==
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+ QueryConstants.EMPTY_COLUMN_BYTES :
+
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+
+
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME,
emptyColumnFamilyName);
+
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME,
emptyColumnName);
+
scan.setAttribute(BaseScannerRegionObserver.DELETE_VIEW_TTL_EXPIRED,
PDataType.TRUE_BYTES);
+ scan.setAttribute(BaseScannerRegionObserver.MASK_VIEW_TTL_EXPIRED,
PDataType.FALSE_BYTES);
+ scan.setAttribute(BaseScannerRegionObserver.VIEW_TTL,
Bytes.toBytes(Long.valueOf(table.getViewTTL())));
+ PhoenixResultSet rs = pstmt.newResultSet(queryPlan.iterator(),
queryPlan.getProjector(), queryPlan.getContext());
+ }
+ }
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 4392e23..a37b068 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -35,11 +35,10 @@ import org.apache.phoenix.schema.*;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
-import static org.apache.phoenix.util.IndexUtil.addEmptyColumnToScan;
+import static org.apache.phoenix.util.ScanUtil.addEmptyColumnToScan;
/**
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 272e1fa..46866fd 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -93,6 +93,9 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
public static final String RUN_UPDATE_STATS_ASYNC_ATTRIB =
"_RunUpdateStatsAsync";
public static final String SKIP_REGION_BOUNDARY_CHECK =
"_SKIP_REGION_BOUNDARY_CHECK";
public static final String TX_SCN = "_TxScn";
+ public static final String VIEW_TTL = "_ViewTTL";
+ public static final String MASK_VIEW_TTL_EXPIRED = "_MASK_TTL_EXPIRED";
+ public static final String DELETE_VIEW_TTL_EXPIRED = "_DELETE_TTL_EXPIRED";
public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
public static final String REPLAY_WRITES = "_IGNORE_NEWER_MUTATIONS";
public final static String SCAN_OFFSET = "_RowOffset";
@@ -116,7 +119,7 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
// In case of Index Write failure, we need to determine that Index mutation
// is part of normal client write or Index Rebuilder. # PHOENIX-5080
public final static byte[] REPLAY_INDEX_REBUILD_WRITES =
PUnsignedTinyint.INSTANCE.toBytes(3);
-
+
public enum ReplayWrite {
TABLE_AND_INDEX,
INDEX_ONLY,
@@ -209,6 +212,7 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
TimeRange timeRange = scan.getTimeRange();
scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn));
}
+
if (isRegionObserverFor(scan)) {
// For local indexes, we need to throw if out of region as we'll
get inconsistent
// results otherwise while in other cases, it may just mean out
client-side data
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLAwareRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLAwareRegionObserver.java
new file mode 100644
index 0000000..f9749f0
--- /dev/null
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLAwareRegionObserver.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static
org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
+
+/**
+ *
+ * Coprocessor that checks whether the row is expired based on the TTL spec.
+ *
+ */
+public class TTLAwareRegionObserver extends BaseRegionObserver {
+ private static final Log LOG =
LogFactory.getLog(TTLAwareRegionObserver.class);
+
+ /**
+ * A region scanner that checks the TTL expiration of rows
+ */
+ private static class TTLAwareRegionScanner implements RegionScanner {
+ RegionScanner scanner;
+ private Scan scan;
+ private byte[] emptyCF;
+ private byte[] emptyCQ;
+ private Region region;
+ private long minTimestamp;
+ private long maxTimestamp;
+ private long now;
+ private boolean deleteIfExpired;
+ private boolean maskIfExpired;
+
+ public TTLAwareRegionScanner(RegionCoprocessorEnvironment env,
+ Scan scan,
+ RegionScanner scanner) throws IOException {
+ this.scan = scan;
+ this.scanner = scanner;
+
+ deleteIfExpired = ScanUtil.isDeleteTTLExpiredRows(scan) ? true :
false;
+ maskIfExpired = !deleteIfExpired &&
ScanUtil.isMaskTTLExpiredRows(scan) ? true : false;;
+
+ region = env.getRegion();
+ emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+ emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+ byte[] txnScn =
scan.getAttribute(BaseScannerRegionObserver.TX_SCN);
+ if (txnScn!=null) {
+ TimeRange timeRange = scan.getTimeRange();
+ scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn));
+ }
+ minTimestamp = scan.getTimeRange().getMin();
+ maxTimestamp = scan.getTimeRange().getMax();
+ now = maxTimestamp != HConstants.LATEST_TIMESTAMP ? maxTimestamp :
EnvironmentEdgeManager.currentTimeMillis();
+ }
+
+ @Override
+ public int getBatch() {
+ return scanner.getBatch();
+ }
+
+ @Override
+ public long getMaxResultSize() {
+ return scanner.getMaxResultSize();
+ }
+
+ @Override
+ public boolean next(List<Cell> result) throws IOException {
+ try {
+ boolean hasMore;
+ do {
+ hasMore = scanner.next(result);
+ if (result.isEmpty()) {
+ break;
+ }
+
+ if (maskIfExpired && checkRowNotExpired(result)) {
+ break;
+ }
+
+ if (deleteIfExpired && deleteRowIfExpired(result)) {
+ break;
+ }
+ // skip this row
+ // 1. if the row has expired (checkRowNotExpired returned
false)
+ // 2. if the row was not deleted (deleteRowIfExpired
returned false and
+ // do not want it to count towards the deleted count)
+ result.clear();
+ } while (hasMore);
+ return hasMore;
+ } catch (Throwable t) {
+
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override
+ public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
+ throw new IOException("next with scannerContext should not be
called in Phoenix environment");
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> result, ScannerContext
scannerContext) throws IOException {
+ throw new IOException("NextRaw with scannerContext should not be
called in Phoenix environment");
+ }
+
+ @Override
+ public void close() throws IOException {
+ scanner.close();
+ }
+
+ @Override
+ public RegionInfo getRegionInfo() {
+ return scanner.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() throws IOException {
+ return scanner.isFilterDone();
+ }
+
+ @Override
+ public boolean reseek(byte[] row) throws IOException {
+ return scanner.reseek(row);
+ }
+
+ @Override
+ public long getMvccReadPoint() {
+ return scanner.getMvccReadPoint();
+ }
+
+ @Override
+ public boolean nextRaw(List<Cell> result) throws IOException {
+ try {
+ boolean hasMore;
+ do {
+ hasMore = scanner.nextRaw(result);
+ if (result.isEmpty()) {
+ break;
+ }
+ if (maskIfExpired && checkRowNotExpired(result)) {
+ break;
+ }
+
+ if (deleteIfExpired && deleteRowIfExpired(result)) {
+ break;
+ }
+ // skip this row
+ // 1. if the row has expired (checkRowNotExpired returned
false)
+ // 2. if the row was not deleted (deleteRowIfExpired
returned false and
+ // do not want it to count towards the deleted count)
+ result.clear();
+ } while (hasMore);
+ return hasMore;
+ } catch (Throwable t) {
+
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ /**
+ * @param cellList is an input and output parameter and will either
include a valid row or be an empty list
+ * @return true if row expired and deleted or empty, otherwise false
+ * @throws IOException
+ */
+ private boolean deleteRowIfExpired(List<Cell> cellList) throws
IOException {
+
+ long cellListSize = cellList.size();
+ if (cellListSize == 0) {
+ return true;
+ }
+
+ Iterator<Cell> cellIterator = cellList.iterator();
+ Cell firstCell = cellIterator.next();
+ byte[] rowKey = new byte[firstCell.getRowLength()];
+ System.arraycopy(firstCell.getRowArray(),
firstCell.getRowOffset(), rowKey, 0, firstCell.getRowLength());
+
+ boolean isRowExpired = !checkRowNotExpired(cellList);
+ if (isRowExpired) {
+ long ttl = ScanUtil.getViewTTL(this.scan) ;
+ long ts = getMaxTimestamp(cellList);
+ LOG.debug(String.format("***** VIEW-TTL: Deleting region = %s,
row = %s, delete-ts = %d, max-ts = %d ****** ",
+ region.getRegionInfo().getTable().getNameAsString(),
+ Bytes.toString(rowKey),
+ now-ttl, ts));
+ Delete del = new Delete(rowKey, now-ttl);
+ Mutation[] mutations = new Mutation[]{del};
+ region.batchMutate(mutations);
+ return true;
+ }
+ return false;
+ }
+
+
+ private boolean isEmptyColumn(Cell cell) {
+ return Bytes.compareTo(cell.getFamilyArray(),
cell.getFamilyOffset(), cell.getFamilyLength(),
+ emptyCF, 0, emptyCF.length) == 0 &&
+ Bytes.compareTo(cell.getQualifierArray(),
cell.getQualifierOffset(), cell.getQualifierLength(),
+ emptyCQ, 0, emptyCQ.length) == 0;
+ }
+
+ // TODO : Remove it after we verify all SQLs include the empty column.
+ // Before we added ScanUtil.addEmptyColumnToScan some queries like
select count(*) did not include
+ // the empty column in scan, thus this method was the fallback in
those cases.
+ private boolean checkEmptyColumnNotExpired(byte[] rowKey) throws
IOException {
+ LOG.warn("Scan " + scan + " did not return the empty column for "
+ region.getRegionInfo().getTable().getNameAsString());
+ Get get = new Get(rowKey);
+ get.setTimeRange(minTimestamp, maxTimestamp);
+ get.addColumn(emptyCF, emptyCQ);
+ Result result = region.get(get);
+ if (result.isEmpty()) {
+ LOG.warn("The empty column does not exist in a row in " +
region.getRegionInfo().getTable().getNameAsString());
+ return false;
+ }
+ return !isTTLExpired(result.getColumnLatestCell(emptyCF, emptyCQ));
+ }
+
+ /**
+ * @param cellList is an input and output parameter and will either
include a valid row or be an empty list
+ * @return true if row not expired, otherwise false
+ * @throws IOException
+ */
+ private boolean checkRowNotExpired(List<Cell> cellList) throws
IOException {
+ long cellListSize = cellList.size();
+ Cell cell = null;
+ if (cellListSize == 0) {
+ return true;
+ }
+ Iterator<Cell> cellIterator = cellList.iterator();
+ while (cellIterator.hasNext()) {
+ cell = cellIterator.next();
+ if (isEmptyColumn(cell)) {
+ LOG.debug(String.format("**********VIEW-TTL: Row expired
for [%s], expired = %s ***************", cell.toString(), isTTLExpired(cell)));
+ // Empty column is not supposed to be returned to the
client except it is the only column included
+ // in the scan
+ if (cellListSize > 1) {
+ cellIterator.remove();
+ }
+ return !isTTLExpired(cell);
+ }
+ }
+ byte[] rowKey = new byte[cell.getRowLength()];
+ System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey,
0, cell.getRowLength());
+ return checkEmptyColumnNotExpired(rowKey);
+ }
+
+ private long getMaxTimestamp(List<Cell> cellList) {
+ long maxTs = 0;
+ long ts = 0;
+ Iterator<Cell> cellIterator = cellList.iterator();
+ while (cellIterator.hasNext()) {
+ Cell cell = cellIterator.next();
+ ts = cell.getTimestamp();
+ if (ts > maxTs) {
+ maxTs = ts;
+ }
+ }
+ return maxTs;
+ }
+
+ private boolean isTTLExpired(Cell cell) {
+ long ts = cell.getTimestamp();
+ long ttl = ScanUtil.getViewTTL(this.scan) ;
+ if (ts + ttl < now) {
+ return true;
+ }
+ return false;
+ }
+ }
+
+
+
+ @Override
+ public RegionScanner
postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+ Scan scan, RegionScanner s) throws
IOException {
+
+ if (!ScanUtil.isMaskTTLExpiredRows(scan) &&
!ScanUtil.isDeleteTTLExpiredRows(scan)) {
+ return s;
+ }
+
+ LOG.debug(String.format(
+ "********** VIEW-TTL: TTLAwareRegionObserver::postScannerOpen
TTL for table = [%s], scan = [%s], VIEW_TTL = %d ***************",
+ s.getRegionInfo().getTable().getNameAsString(),
+ scan.toJSON(Integer.MAX_VALUE),
+ ScanUtil.getViewTTL(scan)));
+ return new TTLAwareRegionScanner(c.getEnvironment(), scan, s);
+ }
+}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index c6974be..4988131 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -133,7 +133,8 @@ public class TableResultIterator implements ResultIterator {
this.caches = caches;
this.retry=plan.getContext().getConnection().getQueryServices().getProps()
.getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES,
QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
- IndexUtil.setScanAttributesForIndexReadRepair(scan, table,
plan.getContext().getConnection());
+ ScanUtil.setScanAttributesForIndexReadRepair(scan, table,
plan.getContext().getConnection());
+ ScanUtil.setScanAttributesForViewTTL(scan, table,
plan.getContext().getConnection());
}
@Override
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 20308a5..6ab37b5 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -162,6 +162,7 @@ import
org.apache.phoenix.coprocessor.MetaDataRegionObserver;
import org.apache.phoenix.coprocessor.ScanRegionObserver;
import org.apache.phoenix.coprocessor.SequenceRegionObserver;
import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
+import org.apache.phoenix.coprocessor.TTLAwareRegionObserver;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import
org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.CreateViewAddChildLinkRequest;
@@ -1099,6 +1100,15 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
}
}
}
+
+ // The priority for this co-processor should be set higher than
the GlobalIndexChecker so that the read repair scans
+ // are intercepted by the TTLAwareRegionObserver and only the rows
that are not ttl-expired are returned.
+ if (!SchemaUtil.isSystemTable(tableName)) {
+ if
(!newDesc.hasCoprocessor(TTLAwareRegionObserver.class.getName())) {
+
builder.addCoprocessor(TTLAwareRegionObserver.class.getName(), null,
priority-2, null);
+ }
+ }
+
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index a61fdf7..8b62d3c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -21,11 +21,9 @@ import static
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERS
import static
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
import static
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
-import static
org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
import static
org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER;
-import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import static org.apache.phoenix.util.PhoenixRuntime.getTable;
import java.io.ByteArrayInputStream;
@@ -39,7 +37,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
-import java.util.NavigableSet;
import java.util.concurrent.TimeUnit;
import com.google.common.cache.Cache;
@@ -59,8 +56,6 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -83,7 +78,6 @@ import
org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
import
org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.BaseQueryPlan;
import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
@@ -91,9 +85,6 @@ import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.expression.SingleCellColumnExpression;
import org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor;
-import org.apache.phoenix.filter.ColumnProjectionFilter;
-import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
-import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -101,7 +92,6 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -141,6 +131,7 @@ import
org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
import com.google.common.collect.Lists;
public class IndexUtil {
+
public static final String INDEX_COLUMN_NAME_SEP = ":";
public static final byte[] INDEX_COLUMN_NAME_SEP_BYTES =
Bytes.toBytes(INDEX_COLUMN_NAME_SEP);
@@ -802,93 +793,4 @@ public class IndexUtil {
throw new IOException(e);
}
}
-
- private static boolean containsOneOrMoreColumn(Scan scan) {
- Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
- if (familyMap == null || familyMap.isEmpty()) {
- return false;
- }
- for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
familyMap.entrySet()) {
- NavigableSet<byte[]> family = entry.getValue();
- if (family != null && !family.isEmpty()) {
- return true;
- }
- }
- return false;
- }
-
- public static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[]
emptyCQ) {
- boolean addedEmptyColumn = false;
- Iterator<Filter> iterator = ScanUtil.getFilterIterator(scan);
- while (iterator.hasNext()) {
- Filter filter = iterator.next();
- if (filter instanceof EncodedQualifiersColumnProjectionFilter) {
- ((EncodedQualifiersColumnProjectionFilter)
filter).addTrackedColumn(ENCODED_EMPTY_COLUMN_NAME);
- if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
- scan.addColumn(emptyCF, emptyCQ);
- }
- }
- else if (filter instanceof ColumnProjectionFilter) {
- ((ColumnProjectionFilter) filter).addTrackedColumn(new
ImmutableBytesPtr(emptyCF), new ImmutableBytesPtr(emptyCQ));
- if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
- scan.addColumn(emptyCF, emptyCQ);
- }
- }
- else if (filter instanceof MultiEncodedCQKeyValueComparisonFilter)
{
- ((MultiEncodedCQKeyValueComparisonFilter)
filter).setMinQualifier(ENCODED_EMPTY_COLUMN_NAME);
- }
- else if (!addedEmptyColumn && filter instanceof
FirstKeyOnlyFilter) {
- scan.addColumn(emptyCF, emptyCQ);
- addedEmptyColumn = true;
- }
- }
- if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
- scan.addColumn(emptyCF, emptyCQ);
- }
- }
-
- public static void setScanAttributesForIndexReadRepair(Scan scan, PTable
table, PhoenixConnection phoenixConnection) throws SQLException {
- if (table.isTransactional() || table.getType() != PTableType.INDEX) {
- return;
- }
- PTable indexTable = table;
- if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) {
- return;
- }
- String schemaName = indexTable.getParentSchemaName().getString();
- String tableName = indexTable.getParentTableName().getString();
- PTable dataTable;
- try {
- dataTable = PhoenixRuntime.getTable(phoenixConnection,
SchemaUtil.getTableName(schemaName, tableName));
- } catch (TableNotFoundException e) {
- // This index table must be being deleted. No need to set the scan
attributes
- return;
- }
- // MetaDataClient modifies the index table name for view indexes if
the parent view of an index has a child
- // view. This, we need to recreate a PTable object with the correct
table name for the rest of this code to work
- if (indexTable.getViewIndexId() != null &&
indexTable.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR))
{
- int lastIndexOf =
indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
- String indexName =
indexTable.getName().getString().substring(lastIndexOf + 1);
- indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName);
- }
- if (!dataTable.getIndexes().contains(indexTable)) {
- return;
- }
- if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) {
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- IndexMaintainer.serialize(dataTable, ptr,
Collections.singletonList(indexTable), phoenixConnection);
- scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD,
ByteUtil.copyKeyBytesIfNecessary(ptr));
- }
- scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN,
TRUE_BYTES);
- scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME,
dataTable.getPhysicalName().getBytes());
- IndexMaintainer indexMaintainer =
indexTable.getIndexMaintainer(dataTable, phoenixConnection);
- byte[] emptyCF =
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
- byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
- scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME,
emptyCF);
-
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME,
emptyCQ);
- if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) ==
null) {
- BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
- }
- addEmptyColumnToScan(scan, emptyCF, emptyCQ);
- }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 0a37411..4ffd49f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -23,6 +23,8 @@ import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_AN
import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
+import static
org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import java.io.IOException;
import java.sql.SQLException;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
@@ -53,13 +56,19 @@ import
org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.BaseQueryPlan;
import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
import org.apache.phoenix.filter.BooleanExpressionFilter;
+import org.apache.phoenix.filter.ColumnProjectionFilter;
import org.apache.phoenix.filter.DistinctPrefixFilter;
+import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.KeyRange.Bound;
import org.apache.phoenix.query.QueryConstants;
@@ -70,8 +79,10 @@ import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PVarbinary;
@@ -961,4 +972,140 @@ public class ScanUtil {
public static void setClientVersion(Scan scan, int version) {
scan.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION,
Bytes.toBytes(version));
}
+
+ public static long getViewTTL(Scan scan) {
+ byte[] viewTTL = scan.getAttribute(BaseScannerRegionObserver.VIEW_TTL);
+ if (viewTTL == null) {
+ return 0L;
+ }
+ return Bytes.toLong(viewTTL);
+ }
+
+ public static boolean isMaskTTLExpiredRows(Scan scan) {
+ return
scan.getAttribute(BaseScannerRegionObserver.MASK_VIEW_TTL_EXPIRED) != null &&
+
(Bytes.compareTo(scan.getAttribute(BaseScannerRegionObserver.MASK_VIEW_TTL_EXPIRED),
+ PDataType.TRUE_BYTES) == 0)
+ && scan.getAttribute(BaseScannerRegionObserver.VIEW_TTL) !=
null;
+ }
+
+ public static boolean isDeleteTTLExpiredRows(Scan scan) {
+ return
scan.getAttribute(BaseScannerRegionObserver.DELETE_VIEW_TTL_EXPIRED) != null &&
(
+
Bytes.compareTo(scan.getAttribute(BaseScannerRegionObserver.DELETE_VIEW_TTL_EXPIRED),
+ PDataType.TRUE_BYTES) == 0)
+ && scan.getAttribute(BaseScannerRegionObserver.VIEW_TTL) !=
null;
+ }
+
+ public static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[]
emptyCQ) {
+ boolean addedEmptyColumn = false;
+ Iterator<Filter> iterator = ScanUtil.getFilterIterator(scan);
+ while (iterator.hasNext()) {
+ Filter filter = iterator.next();
+ if (filter instanceof EncodedQualifiersColumnProjectionFilter) {
+ ((EncodedQualifiersColumnProjectionFilter)
filter).addTrackedColumn(ENCODED_EMPTY_COLUMN_NAME);
+ if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+ scan.addColumn(emptyCF, emptyCQ);
+ }
+ } else if (filter instanceof ColumnProjectionFilter) {
+ ((ColumnProjectionFilter) filter).addTrackedColumn(new
ImmutableBytesPtr(emptyCF),
+ new ImmutableBytesPtr(emptyCQ));
+ if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+ scan.addColumn(emptyCF, emptyCQ);
+ }
+ } else if (filter instanceof
MultiEncodedCQKeyValueComparisonFilter) {
+ ((MultiEncodedCQKeyValueComparisonFilter)
filter).setMinQualifier(ENCODED_EMPTY_COLUMN_NAME);
+ } else if (!addedEmptyColumn && filter instanceof
FirstKeyOnlyFilter) {
+ scan.addColumn(emptyCF, emptyCQ);
+ addedEmptyColumn = true;
+ }
+ }
+ if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+ scan.addColumn(emptyCF, emptyCQ);
+ }
+ }
+
+ public static boolean containsOneOrMoreColumn(Scan scan) {
+ Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+ if (familyMap == null || familyMap.isEmpty()) {
+ return false;
+ }
+ for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
familyMap.entrySet()) {
+ NavigableSet<byte[]> family = entry.getValue();
+ if (family != null && !family.isEmpty()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static void setScanAttributesForIndexReadRepair(Scan scan, PTable
table,
+ PhoenixConnection phoenixConnection) throws SQLException {
+ if (table.isTransactional() || table.getType() != PTableType.INDEX) {
+ return;
+ }
+
+ PTable indexTable = table;
+ if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) {
+ return;
+ }
+ String schemaName = indexTable.getParentSchemaName().getString();
+ String tableName = indexTable.getParentTableName().getString();
+ PTable dataTable;
+ try {
+ dataTable =
+ PhoenixRuntime.getTable(phoenixConnection,
+ SchemaUtil.getTableName(schemaName, tableName));
+ } catch (TableNotFoundException e) {
+ // This index table must be being deleted. No need to set the scan
attributes
+ return;
+ }
+ // MetaDataClient modifies the index table name for view indexes if
the parent view of an index has a child
+ // view. This, we need to recreate a PTable object with the correct
table name for the rest of this code to work
+ if
(table.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR))
{
+ int lastIndexOf =
table.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
+ String indexName =
table.getName().getString().substring(lastIndexOf + 1);
+ indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName);
+ }
+
+ if (!dataTable.getIndexes().contains(indexTable)) {
+ return;
+ }
+ if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ IndexMaintainer.serialize(dataTable, ptr,
Collections.singletonList(indexTable),
+ phoenixConnection);
+ scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD,
ByteUtil.copyKeyBytesIfNecessary(ptr));
+ }
+ scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN,
TRUE_BYTES);
+ scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME,
+ dataTable.getPhysicalName().getBytes());
+ IndexMaintainer indexMaintainer =
indexTable.getIndexMaintainer(dataTable, phoenixConnection);
+ byte[] emptyCF =
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+ byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+ scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME,
emptyCF);
+
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME,
emptyCQ);
+ if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) ==
null) {
+ BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
+ }
+ addEmptyColumnToScan(scan, emptyCF, emptyCQ);
+ }
+
+ public static void setScanAttributesForViewTTL(Scan scan, PTable table,
+ PhoenixConnection phoenixConnection) throws SQLException {
+
+ if (table.getViewTTL() != 0) {
+ byte[] emptyColumnFamilyName =
SchemaUtil.getEmptyColumnFamily(table);
+ byte[]
+ emptyColumnName =
+ table.getEncodingScheme() ==
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+ QueryConstants.EMPTY_COLUMN_BYTES :
+
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+
+
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME,
emptyColumnFamilyName);
+
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME,
emptyColumnName);
+ scan.setAttribute(BaseScannerRegionObserver.MASK_VIEW_TTL_EXPIRED,
PDataType.TRUE_BYTES);
+ scan.setAttribute(BaseScannerRegionObserver.VIEW_TTL,
Bytes.toBytes(Long.valueOf(table.getViewTTL())));
+ addEmptyColumnToScan(scan, emptyColumnFamilyName, emptyColumnName);
+ }
+
+ }
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java
index ccc378f..8158df9 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java
@@ -20,7 +20,11 @@ package org.apache.phoenix.query;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
+import com.google.common.collect.TreeBasedTable;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.util.PropertiesUtil;
@@ -31,9 +35,12 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Arrays.asList;
@@ -189,6 +196,41 @@ public class PhoenixTestBuilder {
List<Object> getValues(int rowIndex);
}
+ // A Data Reader to be used in tests to read test data from test db.
+ public interface DataReader {
+ // returns the columns that need to be projected during DML queries,
+ List<String> getValidationColumns();
+
+ void setValidationColumns(List<String> validationColumns);
+
+ // returns the columns that represent the pk/unique key for this data
set,
+ List<String> getRowKeyColumns();
+
+ void setRowKeyColumns(List<String> rowKeyColumns);
+
+ // returns the connection to be used for DML queries.
+ Connection getConnection();
+
+ void setConnection(Connection connection);
+
+ // returns the target entity - whether to use the table, global-view,
the tenant-view or
+ // an index table.
+ String getTargetEntity();
+
+ void setTargetEntity(String targetEntity);
+
+ // Build the DML statement and return the SQL string.
+ String getDML();
+
+ String setDML(String dmlStatement);
+
+ // template method to read a batch of rows using the above sql.
+ void readRows() throws SQLException;
+
+ // Get the data that was read as a Table.
+ Table<String, String, Object> getDataTable();
+ }
+
// A Data Writer to be used in tests to upsert sample data (@see
TestDataSupplier) into the sample schema.
public interface DataWriter {
// returns the columns that need to be upserted,
@@ -212,66 +254,142 @@ public class PhoenixTestBuilder {
void setTargetEntity(String targetEntity);
+ // returns the columns that is set asthe pk/unique key for this data
set,
+ List<String> getRowKeyColumns();
+
+ void setRowKeyColumns(List<String> rowKeyColumns);
+
// return the data provider for this writer
DataSupplier getTestDataSupplier();
- // template method to upsert rows using the above info.
- void upsertRow(int rowIndex) throws SQLException;
-
void setDataSupplier(DataSupplier dataSupplier);
+
+ // template method to upsert a single row using the above info.
+ List<Object> upsertRow(int rowIndex) throws SQLException;
+
+ // template method to upsert a batch of rows using the above info.
+ void upsertRows(int numRows) throws SQLException;
+
+ // Get the data that was written as a Table
+ Table<String, String, Object> getDataTable();
}
- /**
- * Test SchemaBuilder defaults.
- */
- public static class DDLDefaults {
- public static final int MAX_ROWS = 10000;
- public static List<String> TABLE_PK_TYPES = asList("CHAR(15)",
"CHAR(3)");
- public static List<String> GLOBAL_VIEW_PK_TYPES = asList("CHAR(15)");
- public static List<String> TENANT_VIEW_PK_TYPES = asList("CHAR(15)");
+ // Provides template method for returning result set
+ public static abstract class AbstractDataReader implements DataReader {
+ Table<String, String, Object> dataTable = TreeBasedTable.create();
- public static List<String> COLUMN_TYPES = asList("VARCHAR", "VARCHAR",
"VARCHAR");
- public static List<String> TABLE_COLUMNS = asList("COL1", "COL2",
"COL3");
- public static List<String> GLOBAL_VIEW_COLUMNS = asList("COL4",
"COL5", "COL6");
- public static List<String> TENANT_VIEW_COLUMNS = asList("COL7",
"COL8", "COL9");
+ public Table<String, String, Object> getDataTable() {
+ return dataTable;
+ }
- public static List<String> TABLE_COLUMN_FAMILIES = asList(null, null,
null);
- public static List<String> GLOBAL_VIEW_COLUMN_FAMILIES = asList(null,
null, null);
- public static List<String> TENANT_VIEW_COLUMN_FAMILIES = asList(null,
null, null);
+ // Read batch of rows
+ public void readRows() throws SQLException {
+ dataTable.clear();
+ String sql = getDML();
+ Connection connection = getConnection();
+ try (Statement stmt = connection.createStatement()) {
+
+ final PhoenixStatement pstmt =
stmt.unwrap(PhoenixStatement.class);
+ ResultSet rs = pstmt.executeQuery(sql);
+ List<String> cols = getValidationColumns();
+ List<Object> values = Lists.newArrayList();
+ Set<String> rowKeys = getRowKeyColumns() == null ||
getRowKeyColumns().isEmpty() ?
+ Sets.<String>newHashSet() :
+ Sets.newHashSet(getRowKeyColumns());
+ List<String> rowKeyParts = Lists.newArrayList();
+ while (rs.next()) {
+ for (String col : cols) {
+ Object val = rs.getObject(col);
+ values.add(val);
+ if (rowKeys.isEmpty()) {
+ rowKeyParts.add(val.toString());
+ }
+ else if (rowKeys.contains(col)) {
+ rowKeyParts.add(val.toString());
+ }
+ }
- public static List<String> TABLE_PK_COLUMNS = asList("OID", "KP");
- public static List<String> GLOBAL_VIEW_PK_COLUMNS = asList("ID");
- public static List<String> TENANT_VIEW_PK_COLUMNS = asList("ZID");
+ String rowKey = Joiner.on("-").join(rowKeyParts);
+ for (int v = 0; v < values.size(); v++) {
+ dataTable.put(rowKey,cols.get(v), values.get(v));
+ }
+ values.clear();
+ rowKeyParts.clear();
+ }
+ LOGGER.info(String.format("########## rows: %d",
dataTable.rowKeySet().size()));
- public static List<String> TABLE_INDEX_COLUMNS = asList("COL1");
- public static List<String> TABLE_INCLUDE_COLUMNS = asList("COL3");
+ } catch (SQLException e) {
+ LOGGER.error(String.format(" Error [%s] initializing Reader. ",
+ e.getMessage()));
+ throw e;
+ }
+ }
+ }
- public static List<String> GLOBAL_VIEW_INDEX_COLUMNS = asList("COL4");
- public static List<String> GLOBAL_VIEW_INCLUDE_COLUMNS =
asList("COL6");
+ // An implementation of the DataReader.
+ public static class BasicDataReader extends AbstractDataReader {
- public static List<String> TENANT_VIEW_INDEX_COLUMNS = asList("COL9");
- public static List<String> TENANT_VIEW_INCLUDE_COLUMNS =
asList("COL7");
+ Connection connection;
+ String targetEntity;
+ String dmlStatement;
+ List<String> validationColumns;
+ List<String> rowKeyColumns;
- public static String
- DEFAULT_TABLE_PROPS =
- "COLUMN_ENCODED_BYTES=0,
MULTI_TENANT=true,DEFAULT_COLUMN_FAMILY='Z'";
- public static String DEFAULT_TABLE_INDEX_PROPS = "";
- public static String DEFAULT_GLOBAL_VIEW_PROPS = "";
- public static String DEFAULT_GLOBAL_VIEW_INDEX_PROPS = "";
- public static String DEFAULT_TENANT_VIEW_PROPS = "";
- public static String DEFAULT_TENANT_VIEW_INDEX_PROPS = "";
- public static String DEFAULT_KP = "0EC";
- public static String DEFAULT_SCHEMA_NAME = "TEST_ENTITY";
- public static String DEFAULT_TENANT_ID_FMT = "00D0t%03d%s";
- public static String DEFAULT_CONNECT_URL = "jdbc:phoenix:localhost";
+ @Override public String getDML() {
+ return this.dmlStatement;
+ }
+ @Override public String setDML(String dmlStatement) {
+ return this.dmlStatement = dmlStatement;
+ }
+
+ // returns the columns that need to be projected during DML queries,
+ @Override public List<String> getValidationColumns() {
+ return this.validationColumns;
+ }
+
+ @Override public void setValidationColumns(List<String>
validationColumns) {
+ this.validationColumns = validationColumns;
+ }
+
+ // returns the columns that is set as the pk/unique key for this data
set,
+ @Override public List<String> getRowKeyColumns() {
+ return this.rowKeyColumns;
+ }
+
+ @Override public void setRowKeyColumns(List<String> rowKeyColumns) {
+ this.rowKeyColumns = rowKeyColumns;
+ }
+
+ @Override public Connection getConnection() {
+ return connection;
+ }
+
+ @Override public void setConnection(Connection connection) {
+ this.connection = connection;
+ }
+
+ @Override public String getTargetEntity() {
+ return targetEntity;
+ }
+
+ @Override public void setTargetEntity(String targetEntity) {
+ this.targetEntity = targetEntity;
+ }
}
+
// Provides template method for upserting rows
public static abstract class AbstractDataWriter implements DataWriter {
+ Table<String, String, Object> dataTable = TreeBasedTable.create();
- public void upsertRow(int rowIndex) throws SQLException {
+ public Table<String, String, Object> getDataTable() {
+ return dataTable;
+ }
+
+ // Upsert one row.
+ public List<Object> upsertRow(int rowIndex) throws SQLException {
List<String> upsertColumns = Lists.newArrayList();
List<Object> upsertValues = Lists.newArrayList();
@@ -294,7 +412,6 @@ public class PhoenixTestBuilder {
buf.append("?,");
}
buf.setCharAt(buf.length() - 1, ')');
-
LOGGER.info(buf.toString());
Connection connection = getConnection();
@@ -305,16 +422,83 @@ public class PhoenixTestBuilder {
stmt.execute();
connection.commit();
}
+ return upsertValues;
+ }
+
+ // Upsert batch of rows.
+ public void upsertRows(int numRows) throws SQLException {
+ dataTable.clear();
+ List<String> upsertColumns = Lists.newArrayList();
+ List<Integer> rowKeyPositions = Lists.newArrayList();
+
+ // Figure out the upsert columns based on whether this is a full
or partial row update.
+ boolean isFullRowUpdate = getColumnPositionsToUpdate().isEmpty();
+ if (isFullRowUpdate) {
+ upsertColumns.addAll(getUpsertColumns());
+ } else {
+ List<String> tmpColumns = getUpsertColumns();
+ for (int i : getColumnPositionsToUpdate()) {
+ upsertColumns.add(tmpColumns.get(i));
+ }
+ }
+
+ Set<String> rowKeys = getRowKeyColumns() == null ||
getRowKeyColumns().isEmpty() ?
+ Sets.<String>newHashSet(getUpsertColumns()) :
+ Sets.newHashSet(getRowKeyColumns());
+
+ StringBuilder buf = new StringBuilder("UPSERT INTO ");
+ buf.append(getTargetEntity());
+ buf.append("
(").append(Joiner.on(",").join(upsertColumns)).append(") VALUES(");
+ for (int i = 0; i < upsertColumns.size(); i++) {
+ buf.append("?,");
+ if (rowKeys.contains(upsertColumns.get(i))) {
+ rowKeyPositions.add(i);
+ }
+ }
+ buf.setCharAt(buf.length() - 1, ')');
+ LOGGER.info(buf.toString());
+
+ Connection connection = getConnection();
+ try (PreparedStatement stmt =
connection.prepareStatement(buf.toString())) {
+
+ for (int r = 1; r <= numRows; r++) {
+ List<Object> upsertValues = Lists.newArrayList();
+ if (isFullRowUpdate) {
+
upsertValues.addAll(getTestDataSupplier().getValues(r));
+ } else {
+ List<Object> tmpValues =
getTestDataSupplier().getValues(r);
+ for (int c : getColumnPositionsToUpdate()) {
+ upsertValues.add(tmpValues.get(c));
+ }
+ }
+
+ List<String> rowKeyParts = Lists.newArrayList();
+ for (int position : rowKeyPositions) {
+ rowKeyParts.add(upsertValues.get(position).toString());
+ }
+ String rowKey = Joiner.on("-").join(rowKeyParts);
+
+ for (int v = 0; v < upsertValues.size(); v++) {
+ stmt.setObject(v + 1, upsertValues.get(v));
+ dataTable.put(rowKey,upsertColumns.get(v),
upsertValues.get(v));
+ }
+ stmt.execute();
+ }
+ connection.commit();
+ }
}
+
}
- // An implementation of the TestDataWriter.
+ // An implementation of the DataWriter.
public static class BasicDataWriter extends AbstractDataWriter {
List<String> upsertColumns = Lists.newArrayList();
List<Integer> columnPositionsToUpdate = Lists.newArrayList();
DataSupplier dataSupplier;
Connection connection;
String targetEntity;
+ List<String> rowKeyColumns;
+
@Override public List<String> getUpsertColumns() {
return upsertColumns;
@@ -348,6 +532,15 @@ public class PhoenixTestBuilder {
this.targetEntity = targetEntity;
}
+ // returns the columns that is set as the pk/unique key for this data
set,
+ @Override public List<String> getRowKeyColumns() {
+ return this.rowKeyColumns;
+ }
+
+ @Override public void setRowKeyColumns(List<String> rowKeyColumns) {
+ this.rowKeyColumns = rowKeyColumns;
+ }
+
@Override public DataSupplier getTestDataSupplier() {
return dataSupplier;
}
@@ -358,6 +551,53 @@ public class PhoenixTestBuilder {
}
/**
+ * Test SchemaBuilder defaults.
+ */
+ public static class DDLDefaults {
+ public static final int MAX_ROWS = 10000;
+ public static List<String> TABLE_PK_TYPES = asList("CHAR(15)",
"CHAR(3)");
+ public static List<String> GLOBAL_VIEW_PK_TYPES = asList("CHAR(15)");
+ public static List<String> TENANT_VIEW_PK_TYPES = asList("CHAR(15)");
+
+ public static List<String> COLUMN_TYPES = asList("VARCHAR", "VARCHAR",
"VARCHAR");
+ public static List<String> TABLE_COLUMNS = asList("COL1", "COL2",
"COL3");
+ public static List<String> GLOBAL_VIEW_COLUMNS = asList("COL4",
"COL5", "COL6");
+ public static List<String> TENANT_VIEW_COLUMNS = asList("COL7",
"COL8", "COL9");
+
+ public static List<String> TABLE_COLUMN_FAMILIES = asList(null, null,
null);
+ public static List<String> GLOBAL_VIEW_COLUMN_FAMILIES = asList(null,
null, null);
+ public static List<String> TENANT_VIEW_COLUMN_FAMILIES = asList(null,
null, null);
+
+ public static List<String> TABLE_PK_COLUMNS = asList("OID", "KP");
+ public static List<String> GLOBAL_VIEW_PK_COLUMNS = asList("ID");
+ public static List<String> TENANT_VIEW_PK_COLUMNS = asList("ZID");
+
+ public static List<String> TABLE_INDEX_COLUMNS = asList("COL1");
+ public static List<String> TABLE_INCLUDE_COLUMNS = asList("COL3");
+
+ public static List<String> GLOBAL_VIEW_INDEX_COLUMNS = asList("COL4");
+ public static List<String> GLOBAL_VIEW_INCLUDE_COLUMNS =
asList("COL6");
+
+ public static List<String> TENANT_VIEW_INDEX_COLUMNS = asList("COL9");
+ public static List<String> TENANT_VIEW_INCLUDE_COLUMNS =
asList("COL7");
+
+ public static String
+ DEFAULT_TABLE_PROPS =
+ "COLUMN_ENCODED_BYTES=0,
MULTI_TENANT=true,DEFAULT_COLUMN_FAMILY='Z'";
+ public static String DEFAULT_TABLE_INDEX_PROPS = "";
+ public static String DEFAULT_GLOBAL_VIEW_PROPS = "";
+ public static String DEFAULT_GLOBAL_VIEW_INDEX_PROPS = "";
+ public static String DEFAULT_TENANT_VIEW_PROPS = "";
+ public static String DEFAULT_TENANT_VIEW_INDEX_PROPS = "";
+ public static String DEFAULT_KP = "ECZ";
+ public static String DEFAULT_SCHEMA_NAME = "TEST_ENTITY";
+ public static String DEFAULT_TENANT_ID_FMT = "00D0t%03d%s";
+
+ public static String DEFAULT_CONNECT_URL = "jdbc:phoenix:localhost";
+
+ }
+
+ /**
* Schema builder for test writers to prepare various test scenarios.
* It can be used to define the following type of schemas -
* 1. Simple Table.
@@ -741,20 +981,20 @@ public class PhoenixTestBuilder {
"useTenantConnectionForGlobalView and
useGlobalConnectionOnly both cannot be true");
}
- String tableName = SchemaUtil.getEscapedArgument("T_" +
dataOptions.uniqueName);
- String globalViewName = SchemaUtil.getEscapedArgument("V_" +
dataOptions.uniqueName);
+ String tableName = SchemaUtil.normalizeIdentifier("T_" +
dataOptions.uniqueName);
+ String globalViewName = SchemaUtil.normalizeIdentifier("V_" +
dataOptions.uniqueName);
String
tableSchemaName =
- tableEnabled ?
SchemaUtil.getEscapedArgument(tableOptions.schemaName) : "";
+ tableEnabled ?
SchemaUtil.normalizeIdentifier(tableOptions.schemaName) : "";
String
globalViewSchemaName =
globalViewEnabled ?
-
SchemaUtil.getEscapedArgument(globalViewOptions.schemaName) :
+
SchemaUtil.normalizeIdentifier(globalViewOptions.schemaName) :
"";
String
tenantViewSchemaName =
tenantViewEnabled ?
-
SchemaUtil.getEscapedArgument(tenantViewOptions.schemaName) :
+
SchemaUtil.normalizeIdentifier(tenantViewOptions.schemaName) :
"";
entityTableName = SchemaUtil.getTableName(tableSchemaName,
tableName);
entityGlobalViewName =
SchemaUtil.getTableName(globalViewSchemaName, globalViewName);
@@ -767,7 +1007,7 @@ public class PhoenixTestBuilder {
(String.format("Z%02d",
dataOptions.getViewNumber())) :
DDLDefaults.DEFAULT_KP);
- String tenantViewName =
SchemaUtil.getEscapedArgument(entityKeyPrefix);
+ String tenantViewName =
SchemaUtil.normalizeIdentifier(entityKeyPrefix);
entityTenantViewName =
SchemaUtil.getTableName(tenantViewSchemaName, tenantViewName);
String globalViewCondition = String.format("KP = '%s'",
entityKeyPrefix);
@@ -787,7 +1027,7 @@ public class PhoenixTestBuilder {
if (tableIndexEnabled && !tableIndexCreated) {
String
indexOnTableName =
-
SchemaUtil.getEscapedArgument(String.format("IDX_%s",
+
SchemaUtil.normalizeIdentifier(String.format("IDX_%s",
SchemaUtil.normalizeIdentifier(tableName)));
globalConnection.createStatement().execute(
buildCreateIndexStmt(indexOnTableName,
entityTableName,