[
https://issues.apache.org/jira/browse/PHOENIX-5601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17231759#comment-17231759
]
ASF GitHub Bot commented on PHOENIX-5601:
-----------------------------------------
ChinmaySKulkarni commented on a change in pull request #955:
URL: https://github.com/apache/phoenix/pull/955#discussion_r523158369
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import
org.apache.phoenix.coprocessor.metrics.MetricsPhoenixCoprocessorSourceFactory;
+import org.apache.phoenix.coprocessor.metrics.MetricsPhoenixTTLSource;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+
+/**
+ * Coprocessor that checks whether the row is expired based on the TTL spec.
+ */
+public class PhoenixTTLRegionObserver extends BaseRegionObserver {
+ private static final Log LOG =
LogFactory.getLog(PhoenixTTLRegionObserver.class);
+ private MetricsPhoenixTTLSource metricSource;
+
+ @Override public void start(CoprocessorEnvironment e) throws IOException {
+ super.start(e);
+ metricSource =
MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixTTLSource();
+ }
+
+ @Override public void stop(CoprocessorEnvironment e) throws IOException {
+ super.stop(e);
+ }
+
+ @Override
+ public RegionScanner
postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
+ RegionScanner s) throws IOException {
+
+ if (!ScanUtil.isMaskTTLExpiredRows(scan) &&
!ScanUtil.isDeleteTTLExpiredRows(scan)) {
+ return s;
+ }
+
+ if (ScanUtil.isMaskTTLExpiredRows(scan)) {
+ metricSource.incrementMaskExpiredRequestCount();
+ }
+
+ if (ScanUtil.isDeleteTTLExpiredRows(scan)) {
+ metricSource.incrementDeleteExpiredRequestCount();
+ }
+
+
scan.setAttribute(PhoenixTTLRegionScanner.MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR,
+ Bytes.toBytes(metricSource.getMaskExpiredRequestCount()));
+
+ LOG.debug(String.format(
+ "********** PHOENIX-TTL:
PhoenixTTLRegionObserver::postScannerOpen TTL for table = "
+ + "[%s], scan = [%s], PHOENIX_TTL = %d
***************, numRequests=%d",
+ s.getRegionInfo().getTable().getNameAsString(),
scan.toJSON(Integer.MAX_VALUE),
+ ScanUtil.getPhoenixTTL(scan),
metricSource.getMaskExpiredRequestCount()));
+ return new PhoenixTTLRegionScanner(c.getEnvironment(), scan, s);
+ }
+
+ /**
+ * A region scanner that checks the TTL expiration of rows
+ */
+ private static class PhoenixTTLRegionScanner implements RegionScanner {
+ private static final String MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR =
+ "MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID";
+
+ private final RegionScanner scanner;
+ private final Scan scan;
+ private final byte[] emptyCF;
+ private final byte[] emptyCQ;
+ private final Region region;
+ private final long minTimestamp;
+ private final long maxTimestamp;
+ private final long now;
+ private final boolean deleteIfExpired;
+ private final boolean maskIfExpired;
+ private final long requestId;
+ private long numRowsExpired;
+ private long numRowsScanned;
+ private long numRowsDeleted;
+ private boolean reported = false;
+
+ public PhoenixTTLRegionScanner(RegionCoprocessorEnvironment env, Scan
scan,
+ RegionScanner scanner) throws IOException {
+ this.scan = scan;
+ this.scanner = scanner;
+ byte[] requestIdBytes =
scan.getAttribute(MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR);
+ this.requestId = Bytes.toLong(requestIdBytes);
+
+ deleteIfExpired = ScanUtil.isDeleteTTLExpiredRows(scan);
+ maskIfExpired = !deleteIfExpired &&
ScanUtil.isMaskTTLExpiredRows(scan);
+
+ region = env.getRegion();
+ emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+ emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+ byte[] txnScn =
scan.getAttribute(BaseScannerRegionObserver.TX_SCN);
+ if (txnScn != null) {
+ TimeRange timeRange = scan.getTimeRange();
+ scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn));
+ }
+ minTimestamp = scan.getTimeRange().getMin();
+ maxTimestamp = scan.getTimeRange().getMax();
+ now = maxTimestamp != HConstants.LATEST_TIMESTAMP ?
+ maxTimestamp :
+ EnvironmentEdgeManager.currentTimeMillis();
+ }
+
+ @Override public int getBatch() {
+ return scanner.getBatch();
+ }
+
+ @Override public long getMaxResultSize() {
+ return scanner.getMaxResultSize();
+ }
+
+ @Override public boolean next(List<Cell> result) throws IOException {
+ try {
+ boolean hasMore;
+ do {
+ hasMore = scanner.next(result);
+ if (result.isEmpty()) {
+ break;
+ }
+ numRowsScanned++;
+ if (maskIfExpired && checkRowNotExpired(result)) {
+ break;
+ }
+
+ if (deleteIfExpired && deleteRowIfExpired(result)) {
+ numRowsDeleted++;
+ break;
+ }
+ // skip this row
+ // 1. if the row has expired (checkRowNotExpired returned
false)
+ // 2. if the row was not deleted (deleteRowIfExpired
returned false and
+ // do not want it to count towards the deleted count)
+ if (maskIfExpired) {
+ numRowsExpired++;
+ }
+ result.clear();
+ } while (hasMore);
+ return hasMore;
+ } catch (Throwable t) {
+
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ @Override public boolean next(List<Cell> result, ScannerContext
scannerContext)
+ throws IOException {
+ throw new IOException(
+ "next with scannerContext should not be called in Phoenix
environment");
+ }
+
+ @Override public boolean nextRaw(List<Cell> result, ScannerContext
scannerContext)
+ throws IOException {
+ throw new IOException(
+ "NextRaw with scannerContext should not be called in
Phoenix environment");
+ }
+
+ @Override public void close() throws IOException {
+ if (!reported) {
+ LOG.debug(String.format(
+ "***** PHOENIX-TTL-SCAN-STATS-CLOSE: " +
"request-id:[%d] = [%d, %d, %d]",
+ this.requestId, this.numRowsScanned,
this.numRowsExpired,
+ this.numRowsDeleted));
+ reported = true;
+ }
+ scanner.close();
+ }
+
+ @Override public HRegionInfo getRegionInfo() {
+ return scanner.getRegionInfo();
+ }
+
+ @Override public boolean isFilterDone() throws IOException {
+ return scanner.isFilterDone();
+ }
+
+ @Override public boolean reseek(byte[] row) throws IOException {
+ return scanner.reseek(row);
+ }
+
+ @Override public long getMvccReadPoint() {
+ return scanner.getMvccReadPoint();
+ }
+
+ @Override public boolean nextRaw(List<Cell> result) throws IOException
{
+ try {
+ boolean hasMore;
+ do {
+ hasMore = scanner.nextRaw(result);
+ if (result.isEmpty()) {
+ break;
+ }
+ numRowsScanned++;
+ if (maskIfExpired && checkRowNotExpired(result)) {
+ break;
+ }
+
+ if (deleteIfExpired && deleteRowIfExpired(result)) {
+ numRowsDeleted++;
+ break;
+ }
+ // skip this row
+ // 1. if the row has expired (checkRowNotExpired returned
false)
+ // 2. if the row was not deleted (deleteRowIfExpired
returned false and
+ // do not want it to count towards the deleted count)
+ if (maskIfExpired) {
+ numRowsExpired++;
+ }
+ result.clear();
+ } while (hasMore);
+ return hasMore;
+ } catch (Throwable t) {
+
ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+ return false; // impossible
+ }
+ }
+
+ /**
+ * @param cellList is an input and output parameter and will either
include a valid row or be an empty list
+ * @return true if row expired and deleted or empty, otherwise false
+ * @throws IOException
+ */
+ private boolean deleteRowIfExpired(List<Cell> cellList) throws
IOException {
+
+ long cellListSize = cellList.size();
+ if (cellListSize == 0) {
+ return true;
+ }
+
+ Iterator<Cell> cellIterator = cellList.iterator();
+ Cell firstCell = cellIterator.next();
+ byte[] rowKey = new byte[firstCell.getRowLength()];
+ System.arraycopy(firstCell.getRowArray(),
firstCell.getRowOffset(), rowKey, 0,
+ firstCell.getRowLength());
+
+ boolean isRowExpired = !checkRowNotExpired(cellList);
+ if (isRowExpired) {
+ long ttl = ScanUtil.getPhoenixTTL(this.scan);
+ long ts = getMaxTimestamp(cellList);
+ LOG.debug(String.format(
+ "** PHOENIX-TTL: Deleting region = %s, row = %s,
delete-ts = %d, max-ts = %d ** ",
+ region.getRegionInfo().getTable().getNameAsString(),
Bytes.toString(rowKey),
+ now - ttl, ts));
+ Delete del = new Delete(rowKey, now - ttl);
+ Mutation[] mutations = new Mutation[] { del };
+ region.batchMutate(mutations, HConstants.NO_NONCE,
HConstants.NO_NONCE);
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isEmptyColumn(Cell cell) {
+ return Bytes.compareTo(cell.getFamilyArray(),
cell.getFamilyOffset(),
+ cell.getFamilyLength(), emptyCF, 0, emptyCF.length) == 0 &&
+ Bytes.compareTo(cell.getQualifierArray(),
cell.getQualifierOffset(),
+ cell.getQualifierLength(), emptyCQ, 0,
emptyCQ.length) == 0;
+ }
+
+ // TODO : Remove it after we verify all SQLs include the empty column.
+ // Before we added ScanUtil.addEmptyColumnToScan
+ // some queries like select count(*) did not include the empty column
in scan,
+ // thus this method was the fallback in those cases.
+ private boolean checkEmptyColumnNotExpired(byte[] rowKey) throws
IOException {
+ LOG.warn("Scan " + scan + " did not return the empty column for "
+ region
+ .getRegionInfo().getTable().getNameAsString());
+ Get get = new Get(rowKey);
+ get.setTimeRange(minTimestamp, maxTimestamp);
+ get.addColumn(emptyCF, emptyCQ);
+ Result result = region.get(get);
+ if (result.isEmpty()) {
+ LOG.warn("The empty column does not exist in a row in " +
region.getRegionInfo()
+ .getTable().getNameAsString());
+ return false;
+ }
+ return !isTTLExpired(result.getColumnLatestCell(emptyCF, emptyCQ));
+ }
+
+ /**
+ * @param cellList is an input and output parameter and will either
include a valid row
+ * or be an empty list
+ * @return true if row not expired, otherwise false
+ * @throws IOException
+ */
+ private boolean checkRowNotExpired(List<Cell> cellList) throws
IOException {
Review comment:
@jpisaac can we please add these tests?
##########
File path: phoenix-core/src/test/java/org/apache/phoenix/util/ScanUtilTest.java
##########
@@ -466,4 +482,78 @@ public SortOrder getSortOrder() {
assertArrayEquals(expectedEndKey, endKey);
}
}
+
+ public static class PhoenixTTLScanUtilTest extends
BaseConnectionlessQueryTest {
Review comment:
Nice tests @jpisaac !
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/metrics/MetricsPhoenixCoprocessorSourceFactory.java
##########
@@ -0,0 +1,20 @@
+package org.apache.phoenix.coprocessor.metrics;
+
Review comment:
@jpisaac can we address this please?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/metrics/MetricsPhoenixCoprocessorSourceFactory.java
##########
@@ -0,0 +1,20 @@
+package org.apache.phoenix.coprocessor.metrics;
+
+public class MetricsPhoenixCoprocessorSourceFactory {
+
+ private static final MetricsPhoenixCoprocessorSourceFactory
+ INSTANCE = new MetricsPhoenixCoprocessorSourceFactory();
+ private MetricsPhoenixTTLSource phoenixTTLSource;
+
+ public static MetricsPhoenixCoprocessorSourceFactory getInstance() {
+ return INSTANCE;
+ }
+
+ public synchronized MetricsPhoenixTTLSource getPhoenixTTLSource() {
Review comment:
@jpisaac can we address this please?
##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
##########
@@ -608,4 +635,1353 @@ private SchemaBuilder
createLevel1TenantView(TenantViewOptions tenantViewOptions
assertSyscatHavePhoenixTTLRelatedColumns(tenantId, schemaName,
indexOnTenantViewName,
PTableType.INDEX.getSerializedValue(), 300000);
}
+
+
+ @Test public void testWithTenantViewAndNoGlobalView() throws Exception {
+ // PHOENIX TTL is set in seconds (for e.g 10 secs)
+ long phoenixTTL = 10;
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.getTableColumns().clear();
+ tableOptions.getTableColumnTypes().clear();
+
+ TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+ tenantViewOptions.setTableProps(String.format("PHOENIX_TTL=%d",
phoenixTTL));
+
+ // Define the test schema.
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
schemaBuilder.withTableOptions(tableOptions).withTenantViewOptions(tenantViewOptions)
+ .build();
+
+ // Define the test data.
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String zid = String.format("00A0y000%07d", rowIndex);
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ return Lists.newArrayList(new Object[] { zid, col7, col8, col9
});
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ DataReader dataReader = new BasicDataReader();
+
+ List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8",
"COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("ZID");
+ String
+ tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
+ try (Connection writeConnection =
DriverManager.getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ dataReader.setValidationColumns(columns);
+ dataReader.setRowKeyColumns(rowKeyColumns);
+ dataReader.setDML(String.format("SELECT %s from %s",
Joiner.on(",").join(columns),
+ schemaBuilder.getEntityTenantViewName()));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ upsertDataAndRunValidations(phoenixTTL, DEFAULT_NUM_ROWS,
dataWriter, dataReader,
+ schemaBuilder);
+ }
+ }
+
+ @Test public void testWithSQLUsingIndexWithCoveredColsUpdates() throws
Exception {
+
+ // PHOENIX TTL is set in seconds (for e.g 10 secs)
+ long phoenixTTL = 10;
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.getTableColumns().clear();
+ tableOptions.getTableColumnTypes().clear();
+
+ GlobalViewOptions globalViewOptions =
SchemaBuilder.GlobalViewOptions.withDefaults();
+ globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d",
phoenixTTL));
+
+ GlobalViewIndexOptions
+ globalViewIndexOptions =
+ SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+ globalViewIndexOptions.setLocal(false);
+
+ TenantViewOptions tenantViewOptions = new TenantViewOptions();
+ tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8",
"COL9"));
+ tenantViewOptions
+ .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR",
"VARCHAR", "VARCHAR"));
+
+ OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTableCFs(Lists.newArrayList((String) null, null, null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setGlobalViewCFs(Lists.newArrayList((String) null, null,
null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTenantViewCFs(Lists.newArrayList((String) null, null,
null, null));
+
+ // Define the test schema.
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+ .withTenantViewOptions(tenantViewOptions)
+ .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
+
+ // Define the test data.
+ final List<String> outerCol4s = Lists.newArrayList();
+ DataSupplier dataSupplier = new DataSupplier() {
+ String col4ForWhereClause;
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String id = String.format("00A0y000%07d", rowIndex);
+ String zid = String.format("00B0y000%07d", rowIndex);
+ String col4 = String.format("d%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ // Store the col4 data to be used later in a where clause
+ outerCol4s.add(col4);
+ String col5 = String.format("e%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col6 = String.format("f%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists
+ .newArrayList(new Object[] { id, zid, col4, col5,
col6, col7, col8, col9 });
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ DataReader dataReader = new BasicDataReader();
+
+ List<String> columns =
+ Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6",
"COL7", "COL8", "COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("COL6");
+ String tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
+ try (Connection writeConnection =
DriverManager.getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Upsert data for validation
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ dataReader.setValidationColumns(rowKeyColumns);
+ dataReader.setRowKeyColumns(rowKeyColumns);
+ dataReader.setDML(String.format("SELECT col6 from %s where col4 =
'%s'",
+ schemaBuilder.getEntityTenantViewName(),
outerCol4s.get(1)));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL,
dataReader, schemaBuilder);
+ }
+ }
+
+ /**
+ * Ensure/validate that empty columns for the index are still updated even
when covered columns
+ * are not updated.
+ *
+ * @throws Exception
+ */
+ @Test public void testWithSQLUsingIndexAndNoCoveredColsUpdates() throws
Exception {
+
+ // PHOENIX TTL is set in seconds (for e.g 10 secs)
+ long phoenixTTL = 10;
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.getTableColumns().clear();
+ tableOptions.getTableColumnTypes().clear();
+
+ GlobalViewOptions globalViewOptions =
SchemaBuilder.GlobalViewOptions.withDefaults();
+ globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d",
phoenixTTL));
+
+ GlobalViewIndexOptions globalViewIndexOptions =
+ SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+ globalViewIndexOptions.setLocal(false);
+
+ TenantViewOptions tenantViewOptions = new TenantViewOptions();
+ tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8",
"COL9"));
+ tenantViewOptions
+ .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR",
"VARCHAR", "VARCHAR"));
+
+ OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTableCFs(Lists.newArrayList((String) null, null, null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setGlobalViewCFs(Lists.newArrayList((String) null, null,
null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTenantViewCFs(Lists.newArrayList((String) null, null,
null, null));
+
+ // Define the test schema.
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+ .withTenantViewOptions(tenantViewOptions)
+ .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
+
+ // Define the test data.
+ final List<String> outerCol4s = Lists.newArrayList();
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String id = String.format("00A0y000%07d", rowIndex);
+ String zid = String.format("00B0y000%07d", rowIndex);
+ String col4 = String.format("d%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ // Store the col4 data to be used later in a where clause
+ outerCol4s.add(col4);
+ String col5 = String.format("e%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col6 = String.format("f%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists
+ .newArrayList(new Object[] { id, zid, col4, col5,
col6, col7, col8, col9 });
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ DataReader dataReader = new BasicDataReader();
+
+ List<String> columns =
+ Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6",
"COL7", "COL8", "COL9");
+ List<String> nonCoveredColumns =
+ Lists.newArrayList("ID", "ZID", "COL5", "COL7", "COL8",
"COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("COL6");
+ String tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
+ try (Connection writeConnection =
DriverManager.getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Upsert data for validation
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ dataReader.setValidationColumns(rowKeyColumns);
+ dataReader.setRowKeyColumns(rowKeyColumns);
+ dataReader.setDML(String.format("SELECT col6 from %s where col4 =
'%s'",
+ schemaBuilder.getEntityTenantViewName(),
outerCol4s.get(1)));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL,
dataReader, schemaBuilder);
+
+ // Now update the above data but not modifying the covered columns.
+ // Ensure/validate that empty columns for the index are still
updated.
+
+ // Data supplier where covered and included (col4 and col6)
columns are not updated.
+ DataSupplier dataSupplierForNonCoveredColumns = new DataSupplier()
{
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String id = String.format("00A0y000%07d", rowIndex);
+ String zid = String.format("00B0y000%07d", rowIndex);
+ String col5 = String.format("e%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists.newArrayList(new Object[] { id, zid, col5,
col7, col8, col9 });
+ }
+ };
+
+ // Upsert data for validation with non covered columns
+ dataWriter.setDataSupplier(dataSupplierForNonCoveredColumns);
+ dataWriter.setUpsertColumns(nonCoveredColumns);
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ List<String> rowKeyColumns1 = Lists.newArrayList("ID", "COL6");
+ dataReader.setValidationColumns(rowKeyColumns1);
+ dataReader.setRowKeyColumns(rowKeyColumns1);
+ dataReader.setDML(String.format("SELECT id, col6 from %s where
col4 = '%s'",
+ schemaBuilder.getEntityTenantViewName(),
outerCol4s.get(1)));
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL,
dataReader, schemaBuilder);
+
+ }
+ }
+
+
+ /**
+ * Ensure/validate that correct parent's phoenix ttl value is used when
queries are using the
+ * index.
+ *
+ * @throws Exception
+ */
+ @Test public void testWithSQLUsingIndexAndMultiLevelViews() throws
Exception {
+
+ // PHOENIX TTL is set in seconds (for e.g 10 secs)
+ long phoenixTTL = 10;
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.getTableColumns().clear();
+ tableOptions.getTableColumnTypes().clear();
+
+ GlobalViewOptions globalViewOptions =
SchemaBuilder.GlobalViewOptions.withDefaults();
+ globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d",
phoenixTTL));
+
+ GlobalViewIndexOptions globalViewIndexOptions =
+ SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+ globalViewIndexOptions.setLocal(false);
+
+ TenantViewOptions tenantViewOptions = new TenantViewOptions();
+ tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8",
"COL9"));
+ tenantViewOptions
+ .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR",
"VARCHAR", "VARCHAR"));
+
+
+ OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTableCFs(Lists.newArrayList((String) null, null, null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setGlobalViewCFs(Lists.newArrayList((String) null, null,
null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTenantViewCFs(Lists.newArrayList((String) null, null,
null, null));
+
+ // Define the test schema.
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+ .withTenantViewOptions(tenantViewOptions)
+ .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
+
+
+ String level3ViewName = String.format("%s.%s",
+ DEFAULT_SCHEMA_NAME, "E11");
+ String level3ViewCreateSQL = String.format("CREATE VIEW IF NOT EXISTS
%s AS SELECT * FROM %s",
+ level3ViewName,
+ schemaBuilder.getEntityTenantViewName());
+ String tConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
+ try (Connection tConnection =
DriverManager.getConnection(tConnectUrl)) {
+ tConnection.createStatement().execute(level3ViewCreateSQL);
+ }
+
+
+ // Define the test data.
+ final List<String> outerCol4s = Lists.newArrayList();
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String id = String.format("00A0y000%07d", rowIndex);
+ String zid = String.format("00B0y000%07d", rowIndex);
+ String col4 = String.format("d%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ // Store the col4 data to be used later in a where clause
+ outerCol4s.add(col4);
+ String col5 = String.format("e%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col6 = String.format("f%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists
+ .newArrayList(new Object[] { id, zid, col4, col5,
col6, col7, col8, col9 });
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ DataReader dataReader = new BasicDataReader();
+
+ List<String> columns =
+ Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6",
"COL7", "COL8", "COL9");
+ List<String> nonCoveredColumns =
+ Lists.newArrayList("ID", "ZID", "COL5", "COL7", "COL8",
"COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("COL6");
+ String tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
+ try (Connection writeConnection =
DriverManager.getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+ dataWriter.setTargetEntity(level3ViewName);
+
+ // Upsert data for validation
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ dataReader.setValidationColumns(rowKeyColumns);
+ dataReader.setRowKeyColumns(rowKeyColumns);
+ dataReader.setDML(String.format("SELECT col6 from %s where col4 =
'%s'",
+ level3ViewName, outerCol4s.get(1)));
+ dataReader.setTargetEntity(level3ViewName);
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL,
dataReader, schemaBuilder);
+
+ // Now update the above data but not modifying the covered columns.
+ // Ensure/validate that empty columns for the index are still
updated.
+
+ // Data supplier where covered and included (col4 and col6)
columns are not updated.
+ DataSupplier dataSupplierForNonCoveredColumns = new DataSupplier()
{
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String id = String.format("00A0y000%07d", rowIndex);
+ String zid = String.format("00B0y000%07d", rowIndex);
+ String col5 = String.format("e%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists.newArrayList(new Object[] { id, zid, col5,
col7, col8, col9 });
+ }
+ };
+
+ // Upsert data for validation with non covered columns
+ dataWriter.setDataSupplier(dataSupplierForNonCoveredColumns);
+ dataWriter.setUpsertColumns(nonCoveredColumns);
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ List<String> rowKeyColumns1 = Lists.newArrayList("ID", "COL6");
+ dataReader.setValidationColumns(rowKeyColumns1);
+ dataReader.setRowKeyColumns(rowKeyColumns1);
+ dataReader.setDML(String.format("SELECT id, col6 from %s where
col4 = '%s'",
+ level3ViewName, outerCol4s.get(1)));
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL,
dataReader, schemaBuilder);
+
+ }
+ }
+
+ @Test public void testWithVariousSQLs() throws Exception {
+
+ // PHOENIX TTL is set in seconds (for e.g 10 secs)
+ long phoenixTTL = 10;
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.getTableColumns().clear();
+ tableOptions.getTableColumnTypes().clear();
+
+ GlobalViewOptions globalViewOptions =
SchemaBuilder.GlobalViewOptions.withDefaults();
+ globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d",
phoenixTTL));
+
+ GlobalViewIndexOptions globalViewIndexOptions =
+ SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+ globalViewIndexOptions.setLocal(false);
+
+ TenantViewOptions tenantViewOptions = new TenantViewOptions();
+ tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8",
"COL9"));
+ tenantViewOptions.setTenantViewColumnTypes(asList("CHAR(15)",
"VARCHAR", "VARCHAR", "VARCHAR"));
+
+ OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTableCFs(Lists.newArrayList((String) null, null, null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setGlobalViewCFs(Lists.newArrayList((String) null, null,
null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTenantViewCFs(Lists.newArrayList((String) null, null,
null, null));
+
+ // Define the test schema.
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+ .withTenantViewOptions(tenantViewOptions)
+ .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build();
+
+ // Define the test data.
+ final String groupById = String.format("00A0y000%07d", 0);
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String zid = String.format("00B0y000%07d", rowIndex);
+ String col4 = String.format("d%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col5 = String.format("e%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col6 = String.format("f%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists.newArrayList(
+ new Object[] { groupById, zid, col4, col5, col6, col7,
col8, col9 });
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ List<String> columns =
+ Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6",
"COL7", "COL8", "COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+ String tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
+ try (Connection writeConnection =
DriverManager.getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ // Case : count(1) sql
+ DataReader dataReader = new BasicDataReader();
+ dataReader.setValidationColumns(Arrays.asList("num_rows"));
+ dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+ dataReader.setDML(String
+ .format("SELECT count(1) as num_rows from %s HAVING
count(1) > 0",
+ schemaBuilder.getEntityTenantViewName()));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL,
dataReader, schemaBuilder);
+
+ // Case : group by sql
+ dataReader.setValidationColumns(Arrays.asList("num_rows"));
+ dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+ dataReader.setDML(String
+ .format("SELECT count(1) as num_rows from %s GROUP BY ID
HAVING count(1) > 0",
+ schemaBuilder.getEntityTenantViewName(),
groupById));
+
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL,
dataReader, schemaBuilder);
+ }
+ }
+
+ @Test public void testWithVariousSQLsForMultipleTenants() throws Exception
{
+
+ // PHOENIX TTL is set in seconds (for e.g 10 secs)
+ long phoenixTTL = 10;
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.getTableColumns().clear();
+ tableOptions.getTableColumnTypes().clear();
+
+ GlobalViewOptions globalViewOptions =
SchemaBuilder.GlobalViewOptions.withDefaults();
+ globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d",
phoenixTTL));
+
+ GlobalViewIndexOptions globalViewIndexOptions =
+ SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+ globalViewIndexOptions.setLocal(false);
+
+ TenantViewOptions tenantViewOptions = new TenantViewOptions();
+ tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8",
"COL9"));
+ tenantViewOptions
+ .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR",
"VARCHAR", "VARCHAR"));
+
+ OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTableCFs(Lists.newArrayList((String) null, null, null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setGlobalViewCFs(Lists.newArrayList((String) null, null,
null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTenantViewCFs(Lists.newArrayList((String) null, null,
null, null));
+
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+ .withTenantViewOptions(tenantViewOptions)
+ .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault);
+
+ for (int tenant : Arrays.asList(new Integer[] { 1, 2, 3 })) {
+ // build schema for tenant
+ schemaBuilder.buildWithNewTenant();
+
+ // Define the test data.
+ final String groupById = String.format("00A0y000%07d", 0);
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String zid = String.format("00B0y000%07d", rowIndex);
+ String col4 = String.format("d%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col5 = String.format("e%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col6 = String.format("f%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists.newArrayList(
+ new Object[] { groupById, zid, col4, col5, col6,
col7, col8, col9 });
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ List<String> columns =
+ Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6",
"COL7", "COL8", "COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+ String tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
+ try (Connection writeConnection =
DriverManager.getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ // Case : count(1) sql
+ DataReader dataReader = new BasicDataReader();
+ dataReader.setValidationColumns(Arrays.asList("num_rows"));
+ dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+ dataReader.setDML(String
+ .format("SELECT count(1) as num_rows from %s HAVING
count(1) > 0",
+ schemaBuilder.getEntityTenantViewName()));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL,
dataReader, schemaBuilder);
+
+ // Case : group by sql
+ dataReader.setValidationColumns(Arrays.asList("num_rows"));
+ dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+ dataReader.setDML(String
+ .format("SELECT count(1) as num_rows from %s GROUP BY
ID HAVING count(1) > 0",
+ schemaBuilder.getEntityTenantViewName()));
+
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL,
dataReader, schemaBuilder);
+ }
+ }
+ }
+
+ @Test public void testWithVariousSQLsForMultipleViews() throws Exception {
+
+ // PHOENIX TTL is set in seconds (for e.g 10 secs)
+ long phoenixTTL = 10;
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.getTableColumns().clear();
+ tableOptions.getTableColumnTypes().clear();
+
+ TenantViewOptions tenantViewOptions = new TenantViewOptions();
+ tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8",
"COL9"));
+ tenantViewOptions
+ .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR",
"VARCHAR", "VARCHAR"));
+
+ tenantViewOptions.setTableProps(String.format("PHOENIX_TTL=%d",
phoenixTTL));
+
+ OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTableCFs(Lists.newArrayList((String) null, null, null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setGlobalViewCFs(Lists.newArrayList((String) null, null,
null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTenantViewCFs(Lists.newArrayList((String) null, null,
null, null));
+
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
schemaBuilder.withTableOptions(tableOptions).withTenantViewOptions(tenantViewOptions)
+ .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault);
+
+ for (int view : Arrays.asList(new Integer[] { 1, 2, 3 })) {
+ // build schema for new view
+ schemaBuilder.buildNewView();
+
+ // Define the test data.
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String zid = String.format("00B0y000%07d", rowIndex);
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists.newArrayList(new Object[] { zid, col7, col8,
col9 });
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8",
"COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("ZID");
+ String tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
+ try (Connection writeConnection =
DriverManager.getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ // Case : count(1) sql
+ DataReader dataReader = new BasicDataReader();
+ dataReader.setValidationColumns(Arrays.asList("num_rows"));
+ dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+ dataReader.setDML(String
+ .format("SELECT count(1) as num_rows from %s HAVING
count(1) > 0",
+ schemaBuilder.getEntityTenantViewName()));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL,
dataReader, schemaBuilder);
+
+ // Case : group by sql
+ dataReader.setValidationColumns(Arrays.asList("num_rows"));
+ dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+ dataReader.setDML(String
+ .format("SELECT count(1) as num_rows from %s GROUP BY
ZID HAVING count(1) > 0",
+ schemaBuilder.getEntityTenantViewName()));
+
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL,
dataReader, schemaBuilder);
+ }
+ }
+ }
+
+ @Test public void testWithTenantViewAndGlobalViewAndVariousOptions()
throws Exception {
+ // PHOENIX TTL is set in seconds (for e.g 10 secs)
+ long phoenixTTL = 10;
+
+ // Define the test schema
+ TableOptions tableOptions = TableOptions.withDefaults();
+
+ GlobalViewOptions globalViewOptions =
SchemaBuilder.GlobalViewOptions.withDefaults();
+ globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d",
phoenixTTL));
+
+ GlobalViewIndexOptions globalViewIndexOptions =
GlobalViewIndexOptions.withDefaults();
+
+ TenantViewOptions tenantViewOptions = new TenantViewOptions();
+ tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8",
"COL9"));
+ tenantViewOptions
+ .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR",
"VARCHAR", "VARCHAR"));
+
+ TenantViewIndexOptions tenantViewIndexOptions =
TenantViewIndexOptions.withDefaults();
+
+ OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTableCFs(Lists.newArrayList((String) null, null, null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setGlobalViewCFs(Lists.newArrayList((String) null, null,
null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTenantViewCFs(Lists.newArrayList((String) null, null,
null, null));
+
+ for (String additionalProps : Lists
+ .newArrayList("COLUMN_ENCODED_BYTES=0",
"DEFAULT_COLUMN_FAMILY='0'")) {
+
+ StringBuilder withTableProps = new StringBuilder();
+
withTableProps.append("MULTI_TENANT=true,").append(additionalProps);
+
+ for (boolean isGlobalViewLocal : Lists.newArrayList(true, false)) {
+ for (boolean isTenantViewLocal : Lists.newArrayList(true,
false)) {
+
+ tableOptions.setTableProps(withTableProps.toString());
+ globalViewIndexOptions.setLocal(isGlobalViewLocal);
+ tenantViewIndexOptions.setLocal(isTenantViewLocal);
+ OtherOptions otherOptions =
testCaseWhenAllCFMatchAndAllDefault;
+
+ final SchemaBuilder schemaBuilder = new
SchemaBuilder(getUrl());
+ schemaBuilder.withTableOptions(tableOptions)
+ .withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+ .withTenantViewOptions(tenantViewOptions)
+ .withTenantViewIndexOptions(tenantViewIndexOptions)
+
.withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+ .buildWithNewTenant();
+
+ // Define the test data.
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String id = String.format("00A0y000%07d",
rowIndex);
+ String zid = String.format("00B0y000%07d",
rowIndex);
+ String col1 = String.format("a%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col2 = String.format("b%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col3 = String.format("c%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col4 = String.format("d%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col5 = String.format("e%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col6 = String.format("f%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists.newArrayList(
+ new Object[] { id, zid, col1, col2, col3,
col4, col5, col6,
+ col7, col8, col9 });
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ DataReader dataReader = new BasicDataReader();
+
+ List<String> columns =
+ Lists.newArrayList("ID", "ZID",
+ "COL1", "COL2", "COL3", "COL4", "COL5",
+ "COL6", "COL7", "COL8", "COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("ID",
"ZID");
+ String tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
+
schemaBuilder.getDataOptions().getTenantId();
+ try (Connection writeConnection = DriverManager
+ .getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ dataReader.setValidationColumns(columns);
+ dataReader.setRowKeyColumns(rowKeyColumns);
+ dataReader.setDML(String
+ .format("SELECT %s from %s",
Joiner.on(",").join(columns),
+
schemaBuilder.getEntityTenantViewName()));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ upsertDataAndRunValidations(phoenixTTL,
DEFAULT_NUM_ROWS, dataWriter,
+ dataReader, schemaBuilder);
+ }
+
+ long scnTimestamp =
EnvironmentEdgeManager.currentTimeMillis() +
+ (phoenixTTL * 1000);
+ // Delete expired rows.
+ deleteData(schemaBuilder, scnTimestamp);
+
+ // Verify after deleting TTL expired data.
+ Properties props = new Properties();
+ props.setProperty("CurrentSCN",
Long.toString(scnTimestamp));
+
+ try (Connection readConnection = DriverManager
+ .getConnection(tenantConnectUrl, props)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
+ fetchedData =
+ fetchData(dataReader);
+ assertTrue("Deleted rows should not be fetched",
+ fetchedData.rowKeySet().size() == 0);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * ************************************************************
+ * Case 1: Build schema with TTL set by the tenant view.
+ * TTL for GLOBAL_VIEW - 300000ms (not set)
+ * TTL for TENANT_VIEW - 300000ms
+ * ************************************************************
+ */
+
+ @Test public void testGlobalAndTenantViewTTLInheritance1() throws
Exception {
+ // PHOENIX TTL is set in seconds (for e.g 200 secs)
+ long tenantPhoenixTTL = 200;
+
+ // Define the test schema.
+ // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK
=> (ORG_ID, KP)
+ // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK =>
(ID)
+ // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK =>
(ZID)
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+ SchemaBuilder.GlobalViewOptions globalViewOptions =
+ SchemaBuilder.GlobalViewOptions.withDefaults();
+
+ SchemaBuilder.GlobalViewIndexOptions globalViewIndexOptions =
+ SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+ globalViewIndexOptions.setLocal(false);
+
+ TenantViewOptions tenantViewWithOverrideOptions = new
TenantViewOptions();
+ tenantViewWithOverrideOptions.setTenantViewColumns(asList("ZID",
"COL7", "COL8", "COL9"));
+ tenantViewWithOverrideOptions
+ .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR",
"VARCHAR", "VARCHAR"));
+
tenantViewWithOverrideOptions.setTableProps(String.format("PHOENIX_TTL=%d",
tenantPhoenixTTL));
+
+ OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTableCFs(Lists.newArrayList((String) null, null, null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setGlobalViewCFs(Lists.newArrayList((String) null, null,
null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTenantViewCFs(Lists.newArrayList((String) null, null,
null, null));
+
+ /**
+ * ************************************************************
+ * Case 1: Build schema with TTL set by the tenant view.
+ * TTL for GLOBAL_VIEW - 300000ms (not set)
+ * TTL for TENANT_VIEW - 30000ms
+ * ************************************************************
+ */
+
+
schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+
.withTenantViewOptions(tenantViewWithOverrideOptions).withTenantViewIndexDefaults()
+
.withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).buildWithNewTenant();
+
+ // Define the test data.
+ final String id = String.format("00A0y000%07d", 0);
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String zid = String.format("00B0y000%07d", rowIndex);
+ String col1 = String.format("a%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col2 = String.format("b%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col3 = String.format("c%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col4 = String.format("d%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col5 = String.format("e%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col6 = String.format("f%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists.newArrayList(
+ new Object[] { id, zid, col1, col2, col3, col4, col5,
col6, col7, col8,
+ col9 });
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ List<String> columns =
+ Lists.newArrayList("ID", "ZID",
+ "COL1", "COL2", "COL3", "COL4", "COL5", "COL6",
+ "COL7", "COL8", "COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+ String tenant1ConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
+ try (Connection writeConnection =
DriverManager.getConnection(tenant1ConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ // Case : count(1) sql
+ DataReader dataReader = new BasicDataReader();
+ dataReader.setValidationColumns(Arrays.asList("num_rows"));
+ dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+ dataReader.setDML(String
+ .format("SELECT count(1) as num_rows from %s HAVING
count(1) > 0",
+ schemaBuilder.getEntityTenantViewName()));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data exists before ttl expiration.
+ long probeTimestamp = EnvironmentEdgeManager.currentTimeMillis()
+ + ((tenantPhoenixTTL * 1000) / 2);
+ validateRowsAreNotMaskedUsingCounts(probeTimestamp, dataReader,
schemaBuilder);
+ // Validate data before and after ttl expiration.
+ // Use the tenant phoenix ttl since that is what the view has set.
+ validateExpiredRowsAreNotReturnedUsingCounts(tenantPhoenixTTL,
dataReader, schemaBuilder);
+ }
+ }
+
+ /**
+ * ************************************************************
+ * Case 2: Build schema with TTL set by the global view.
+ * TTL for GLOBAL_VIEW - 300000ms
+ * TTL for TENANT_VIEW - 300000ms (not set, uses global view ttl)
+ * ************************************************************
+ */
+
+ @Test public void testGlobalAndTenantViewTTLInheritance2() throws
Exception {
+ // PHOENIX TTL is set in seconds (for e.g 300 secs)
+ long globalPhoenixTTL = 300;
+
+ // Define the test schema.
+ // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK
=> (ORG_ID, KP)
+ // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK =>
(ID)
+ // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK =>
(ZID)
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+ SchemaBuilder.GlobalViewOptions globalViewOptions =
+ SchemaBuilder.GlobalViewOptions.withDefaults();
+ globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d",
globalPhoenixTTL));
+
+ SchemaBuilder.GlobalViewIndexOptions globalViewIndexOptions =
+ SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+ globalViewIndexOptions.setLocal(false);
+
+ TenantViewOptions tenantViewWithOverrideOptions = new
TenantViewOptions();
+ tenantViewWithOverrideOptions.setTenantViewColumns(asList("ZID",
"COL7", "COL8", "COL9"));
+ tenantViewWithOverrideOptions
+ .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR",
"VARCHAR", "VARCHAR"));
+
+ OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTableCFs(Lists.newArrayList((String) null, null, null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setGlobalViewCFs(Lists.newArrayList((String) null, null,
null));
+ testCaseWhenAllCFMatchAndAllDefault
+ .setTenantViewCFs(Lists.newArrayList((String) null, null,
null, null));
+
+ /**
+ * ************************************************************
+ * Case 2: Build schema with TTL set by the global view.
+ * TTL for GLOBAL_VIEW - 300000ms
+ * TTL for TENANT_VIEW - 300000ms (not set, uses global view ttl)
+ * ************************************************************
+ */
+
+
schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+ .withGlobalViewIndexOptions(globalViewIndexOptions)
+
.withTenantViewOptions(tenantViewWithOverrideOptions).withTenantViewIndexDefaults()
+
.withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).buildWithNewTenant();
+
+ // Define the test data.
+ final String id = String.format("00A0y000%07d", 0);
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String zid = String.format("00B0y000%07d", rowIndex);
+ String col1 = String.format("a%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col2 = String.format("b%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col3 = String.format("c%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col4 = String.format("d%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col5 = String.format("e%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col6 = String.format("f%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+
+ return Lists.newArrayList(
+ new Object[] { id, zid, col1, col2, col3, col4, col5,
col6, col7, col8,
+ col9 });
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ List<String> columns =
+ Lists.newArrayList("ID", "ZID",
+ "COL1", "COL2", "COL3", "COL4", "COL5", "COL6",
+ "COL7", "COL8", "COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+ String tenant1ConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
+ try (Connection writeConnection =
DriverManager.getConnection(tenant1ConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+ upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+ // Case : count(1) sql
+ DataReader dataReader = new BasicDataReader();
+ dataReader.setValidationColumns(Arrays.asList("num_rows"));
+ dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+ dataReader.setDML(String
+ .format("SELECT count(1) as num_rows from %s HAVING
count(1) > 0",
+ schemaBuilder.getEntityTenantViewName()));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data exists before ttl expiration.
+ long probeTimestamp = EnvironmentEdgeManager.currentTimeMillis()
+ + ((globalPhoenixTTL * 1000) / 2);
+ validateRowsAreNotMaskedUsingCounts(probeTimestamp, dataReader,
schemaBuilder);
+ // Validate data before and after ttl expiration.
+ // Use the global phoenix ttl since that is what the view has
inherited.
+ validateExpiredRowsAreNotReturnedUsingCounts(globalPhoenixTTL,
dataReader, schemaBuilder);
+ }
+ }
+
+ @Test public void testDeleteIfExpiredOnTenantView() throws Exception {
+
+ // PHOENIX TTL is set in seconds (for e.g 10 secs)
+ long phoenixTTL = 10;
+ TableOptions tableOptions = TableOptions.withDefaults();
+ tableOptions.getTableColumns().clear();
+ tableOptions.getTableColumnTypes().clear();
+
+ TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+ tenantViewOptions.setTableProps(String.format("PHOENIX_TTL=%d",
phoenixTTL));
+
+ // Define the test schema.
+ final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
schemaBuilder.withTableOptions(tableOptions).withTenantViewOptions(tenantViewOptions)
+ .build();
+
+ // Define the test data.
+ DataSupplier dataSupplier = new DataSupplier() {
+
+ @Override public List<Object> getValues(int rowIndex) {
+ Random rnd = new Random();
+ String zid = String.format("00A0y000%07d", rowIndex);
+ String col7 = String.format("g%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col8 = String.format("h%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ String col9 = String.format("i%05d", rowIndex +
rnd.nextInt(MAX_ROWS));
+ return Lists.newArrayList(new Object[] { zid, col7, col8, col9
});
+ }
+ };
+
+ // Create a test data reader/writer for the above schema.
+ DataWriter dataWriter = new BasicDataWriter();
+ DataReader dataReader = new BasicDataReader();
+
+ List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8",
"COL9");
+ List<String> rowKeyColumns = Lists.newArrayList("ZID");
+ String tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
+ try (Connection writeConnection =
DriverManager.getConnection(tenantConnectUrl)) {
+ writeConnection.setAutoCommit(true);
+ dataWriter.setConnection(writeConnection);
+ dataWriter.setDataSupplier(dataSupplier);
+ dataWriter.setUpsertColumns(columns);
+ dataWriter.setRowKeyColumns(rowKeyColumns);
+
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ dataReader.setValidationColumns(columns);
+ dataReader.setRowKeyColumns(rowKeyColumns);
+ dataReader.setDML(String.format("SELECT %s from %s",
Joiner.on(",").join(columns),
+ schemaBuilder.getEntityTenantViewName()));
+
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+ // Validate data before and after ttl expiration.
+ upsertDataAndRunValidations(phoenixTTL, DEFAULT_NUM_ROWS,
dataWriter, dataReader,
+ schemaBuilder);
+ }
+
+ long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis() +
(phoenixTTL * 1000);
+ // Delete expired rows.
+ deleteData(schemaBuilder, scnTimestamp);
+
+ // Verify after deleting TTL expired data.
+ Properties props = new Properties();
+ props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+
+ try (Connection readConnection =
DriverManager.getConnection(tenantConnectUrl, props)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
+ fetchedData =
+ fetchData(dataReader);
+ assertTrue("Deleted rows should not be fetched",
fetchedData.rowKeySet().size() == 0);
+ }
+ }
+
+ private void upsertDataAndRunValidations(long phoenixTTL, int
numRowsToUpsert,
+ DataWriter dataWriter, DataReader dataReader, SchemaBuilder
schemaBuilder)
+ throws IOException, SQLException {
+
+ //Insert for the first time and validate them.
+ validateExpiredRowsAreNotReturnedUsingData(phoenixTTL,
upsertData(dataWriter, numRowsToUpsert),
+ dataReader, schemaBuilder);
+
+ // Update the above rows and validate the same.
+ validateExpiredRowsAreNotReturnedUsingData(phoenixTTL,
upsertData(dataWriter, numRowsToUpsert),
+ dataReader, schemaBuilder);
+
+ }
+
+ private void validateExpiredRowsAreNotReturnedUsingCounts(long phoenixTTL,
DataReader dataReader,
+ SchemaBuilder schemaBuilder) throws SQLException {
+
+ String tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
+
+ // Verify before TTL expiration
+ try (Connection readConnection =
DriverManager.getConnection(tenantConnectUrl)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
+ fetchedData =
+ fetchData(dataReader);
+ assertTrue("Fetched data should not be null", fetchedData != null);
+ assertTrue("Rows should exists before expiration",
fetchedData.rowKeySet().size() > 0);
+ }
+
+ // Verify after TTL expiration
+ long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+ Properties props = new Properties();
+ props.setProperty("CurrentSCN", Long.toString(scnTimestamp + (2 *
phoenixTTL * 1000)));
+ try (Connection readConnection =
DriverManager.getConnection(tenantConnectUrl, props)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
+ fetchedData =
+ fetchData(dataReader);
+ assertTrue("Fetched data should not be null", fetchedData != null);
+ assertTrue("Expired rows should not be fetched",
fetchedData.rowKeySet().size() == 0);
+ }
+ }
+
+ private void validateExpiredRowsAreNotReturnedUsingData(long phoenixTTL,
+ com.google.common.collect.Table<String, String, Object>
upsertedData,
+ DataReader dataReader, SchemaBuilder schemaBuilder) throws
SQLException {
+
+ String tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
+
+ // Verify before TTL expiration
+ long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+ Properties props = new Properties();
+ props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+ try (Connection readConnection =
DriverManager.getConnection(tenantConnectUrl, props)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
+ fetchedData =
+ fetchData(dataReader);
+ assertTrue("Upserted data should not be null", upsertedData !=
null);
+ assertTrue("Fetched data should not be null", fetchedData != null);
+
+ verifyRowsBeforeTTLExpiration(upsertedData, fetchedData);
+ }
+
+ // Verify after TTL expiration
+ props.setProperty("CurrentSCN", Long.toString(scnTimestamp + (2 *
phoenixTTL * 1000)));
+ try (Connection readConnection =
DriverManager.getConnection(tenantConnectUrl, props)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
+ fetchedData =
+ fetchData(dataReader);
+ assertTrue("Fetched data should not be null", fetchedData != null);
+ assertTrue("Expired rows should not be fetched",
fetchedData.rowKeySet().size() == 0);
+ }
+
+ }
+
+ private void validateRowsAreNotMaskedUsingCounts(long probeTimestamp,
DataReader dataReader,
+ SchemaBuilder schemaBuilder) throws SQLException {
+
+ String tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions()
+ .getTenantId();
+
+ // Verify rows exists (not masked) at current time
+ long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis();
+ Properties props = new Properties();
+ props.setProperty("CurrentSCN", Long.toString(scnTimestamp ));
+ try (Connection readConnection =
DriverManager.getConnection(tenantConnectUrl, props)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
+ fetchedData =
+ fetchData(dataReader);
+ assertTrue("Fetched data should not be null", fetchedData != null);
+ assertTrue("Rows should exists before ttl expiration (now)",
+ fetchedData.rowKeySet().size() > 0);
+ }
+
+ // Verify rows exists (not masked) at probed timestamp
+ props.setProperty("CurrentSCN", Long.toString(probeTimestamp));
+ try (Connection readConnection =
DriverManager.getConnection(tenantConnectUrl, props)) {
+
+ dataReader.setConnection(readConnection);
+ com.google.common.collect.Table<String, String, Object>
+ fetchedData =
+ fetchData(dataReader);
+ assertTrue("Fetched data should not be null", fetchedData != null);
+ assertTrue("Rows should exists before ttl expiration
(probe-timestamp)",
+ fetchedData.rowKeySet().size() > 0);
+ }
+ }
+
+ private void verifyRowsBeforeTTLExpiration(
+ com.google.common.collect.Table<String, String, Object>
upsertedData,
+ com.google.common.collect.Table<String, String, Object>
fetchedData) {
+
+ Set<String> upsertedRowKeys = upsertedData.rowKeySet();
+ Set<String> fetchedRowKeys = fetchedData.rowKeySet();
+ assertTrue("Upserted row keys should not be null", upsertedRowKeys !=
null);
+ assertTrue("Fetched row keys should not be null", fetchedRowKeys !=
null);
+ assertTrue(String.format("Rows upserted and fetched do not match,
upserted=%d, fetched=%d",
+ upsertedRowKeys.size(), fetchedRowKeys.size()),
+ upsertedRowKeys.equals(fetchedRowKeys));
+
+ Set<String> fetchedCols = fetchedData.columnKeySet();
+ for (String rowKey : fetchedRowKeys) {
+ for (String columnKey : fetchedCols) {
+ Object upsertedValue = upsertedData.get(rowKey, columnKey);
+ Object fetchedValue = fetchedData.get(rowKey, columnKey);
+ assertTrue("Upserted values should not be null", upsertedValue
!= null);
+ assertTrue("Fetched values should not be null", fetchedValue
!= null);
+ assertTrue("Values upserted and fetched do not match",
+ upsertedValue.equals(fetchedValue));
+ }
+ }
+ }
+
+ private com.google.common.collect.Table<String, String, Object> upsertData(
+ DataWriter dataWriter, int numRowsToUpsert) throws SQLException {
+ // Upsert rows
+ dataWriter.upsertRows(1, numRowsToUpsert);
+ return dataWriter.getDataTable();
+ }
+
+ private com.google.common.collect.Table<String, String, Object>
fetchData(DataReader dataReader)
+ throws SQLException {
+
+ dataReader.readRows();
+ return dataReader.getDataTable();
+ }
+
+ private void deleteData(SchemaBuilder schemaBuilder, long scnTimestamp)
throws SQLException {
+
+ String viewName = schemaBuilder.getEntityTenantViewName();
+
+ Properties props = new Properties();
+ props.setProperty("CurrentSCN", Long.toString(scnTimestamp));
+ String tenantConnectUrl =
+ getUrl() + ';' + TENANT_ID_ATTRIB + '=' +
schemaBuilder.getDataOptions().getTenantId();
+
+ try (Connection deleteConnection =
DriverManager.getConnection(tenantConnectUrl, props);
+ final Statement statement =
deleteConnection.createStatement()) {
+ deleteConnection.setAutoCommit(true);
+
+ final String deleteIfExpiredStatement = String.format("select *
from %s", viewName);
+ Preconditions.checkNotNull(deleteIfExpiredStatement);
+
+ final PhoenixStatement pstmt =
statement.unwrap(PhoenixStatement.class);
+ // Optimize the query plan so that we potentially use secondary
indexes
+ final QueryPlan queryPlan =
pstmt.optimizeQuery(deleteIfExpiredStatement);
+ final Scan scan = queryPlan.getContext().getScan();
+
+ PTable table = PhoenixRuntime
+ .getTable(deleteConnection,
schemaBuilder.getDataOptions().getTenantId(), viewName);
+
+ byte[] emptyColumnFamilyName =
SchemaUtil.getEmptyColumnFamily(table);
+ byte[] emptyColumnName =
+ table.getEncodingScheme() ==
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+ QueryConstants.EMPTY_COLUMN_BYTES :
+
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+
+
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME,
emptyColumnFamilyName);
+
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME,
emptyColumnName);
+
scan.setAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED,
PDataType.TRUE_BYTES);
+
scan.setAttribute(BaseScannerRegionObserver.MASK_PHOENIX_TTL_EXPIRED,
PDataType.FALSE_BYTES);
+ scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL,
Bytes.toBytes(Long.valueOf(table.getPhoenixTTL())));
+ PhoenixResultSet
+ rs = pstmt.newResultSet(queryPlan.iterator(),
queryPlan.getProjector(), queryPlan.getContext());
Review comment:
@jpisaac can you address this comment please?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add a new Coprocessor - ViewTTLAware Coprocessor
> ------------------------------------------------
>
> Key: PHOENIX-5601
> URL: https://issues.apache.org/jira/browse/PHOENIX-5601
> Project: Phoenix
> Issue Type: Sub-task
> Affects Versions: 5.1.0, 4.16.0
> Reporter: Jacob Isaac
> Assignee: Jacob Isaac
> Priority: Major
> Attachments: PHOENIX-5601.4.x-HBase-1.3.008.patch,
> PHOENIX-5601.master.008.patch
>
>
> * Add a New coprocessor - ViewTTLAware Coprocessor that will intercept
> scan/get requests to inject a new ViewTTLAware scanner.
> The scanner will -
> * Use the row timestamp of the empty column to determine whether row TTL
> has expired and mask the rows from underlying query results.
> * Use the row timestamp to delete expired rows when DELETE_VIEW_TTL_EXPIRED
> flag is present.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)