ChinmaySKulkarni commented on a change in pull request #955: URL: https://github.com/apache/phoenix/pull/955#discussion_r523158369
########## File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java ########## @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.metrics.MetricsPhoenixCoprocessorSourceFactory; +import org.apache.phoenix.coprocessor.metrics.MetricsPhoenixTTLSource; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.ServerUtil; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME; + +/** + * Coprocessor that checks whether the row is expired based on the TTL spec. + */ +public class PhoenixTTLRegionObserver extends BaseRegionObserver { + private static final Log LOG = LogFactory.getLog(PhoenixTTLRegionObserver.class); + private MetricsPhoenixTTLSource metricSource; + + @Override public void start(CoprocessorEnvironment e) throws IOException { + super.start(e); + metricSource = MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixTTLSource(); + } + + @Override public void stop(CoprocessorEnvironment e) throws IOException { + super.stop(e); + } + + @Override + public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, + RegionScanner s) throws IOException { + + if (!ScanUtil.isMaskTTLExpiredRows(scan) && !ScanUtil.isDeleteTTLExpiredRows(scan)) { + return s; + } + + if (ScanUtil.isMaskTTLExpiredRows(scan)) { + metricSource.incrementMaskExpiredRequestCount(); + } + + if (ScanUtil.isDeleteTTLExpiredRows(scan)) { + metricSource.incrementDeleteExpiredRequestCount(); + } + + scan.setAttribute(PhoenixTTLRegionScanner.MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR, + Bytes.toBytes(metricSource.getMaskExpiredRequestCount())); + + LOG.debug(String.format( + "********** PHOENIX-TTL: PhoenixTTLRegionObserver::postScannerOpen TTL for table = " + + "[%s], scan = [%s], PHOENIX_TTL = %d ***************, numRequests=%d", + s.getRegionInfo().getTable().getNameAsString(), scan.toJSON(Integer.MAX_VALUE), + ScanUtil.getPhoenixTTL(scan), metricSource.getMaskExpiredRequestCount())); + return new PhoenixTTLRegionScanner(c.getEnvironment(), scan, s); + } + + /** + * A region scanner that checks the TTL expiration of rows + */ + private static class PhoenixTTLRegionScanner implements RegionScanner { + private static final String MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR = + "MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID"; + + private final RegionScanner scanner; + private final Scan scan; + private final byte[] emptyCF; + private final byte[] emptyCQ; + private final Region region; + private final long minTimestamp; + private final long maxTimestamp; + private final long now; + private final boolean deleteIfExpired; + private final boolean maskIfExpired; + private final long requestId; + private long numRowsExpired; + private long numRowsScanned; + private long numRowsDeleted; + private boolean reported = false; + + public PhoenixTTLRegionScanner(RegionCoprocessorEnvironment env, Scan scan, + RegionScanner scanner) throws IOException { + this.scan = scan; + this.scanner = scanner; + byte[] requestIdBytes = scan.getAttribute(MASK_PHOENIX_TTL_EXPIRED_REQUEST_ID_ATTR); + this.requestId = Bytes.toLong(requestIdBytes); + + deleteIfExpired = ScanUtil.isDeleteTTLExpiredRows(scan); + maskIfExpired = !deleteIfExpired && ScanUtil.isMaskTTLExpiredRows(scan); + + region = env.getRegion(); + emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME); + emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME); + byte[] txnScn = scan.getAttribute(BaseScannerRegionObserver.TX_SCN); + if (txnScn != null) { + TimeRange timeRange = scan.getTimeRange(); + scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn)); + } + minTimestamp = scan.getTimeRange().getMin(); + maxTimestamp = scan.getTimeRange().getMax(); + now = maxTimestamp != HConstants.LATEST_TIMESTAMP ? + maxTimestamp : + EnvironmentEdgeManager.currentTimeMillis(); + } + + @Override public int getBatch() { + return scanner.getBatch(); + } + + @Override public long getMaxResultSize() { + return scanner.getMaxResultSize(); + } + + @Override public boolean next(List<Cell> result) throws IOException { + try { + boolean hasMore; + do { + hasMore = scanner.next(result); + if (result.isEmpty()) { + break; + } + numRowsScanned++; + if (maskIfExpired && checkRowNotExpired(result)) { + break; + } + + if (deleteIfExpired && deleteRowIfExpired(result)) { + numRowsDeleted++; + break; + } + // skip this row + // 1. if the row has expired (checkRowNotExpired returned false) + // 2. if the row was not deleted (deleteRowIfExpired returned false and + // do not want it to count towards the deleted count) + if (maskIfExpired) { + numRowsExpired++; + } + result.clear(); + } while (hasMore); + return hasMore; + } catch (Throwable t) { + ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t); + return false; // impossible + } + } + + @Override public boolean next(List<Cell> result, ScannerContext scannerContext) + throws IOException { + throw new IOException( + "next with scannerContext should not be called in Phoenix environment"); + } + + @Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) + throws IOException { + throw new IOException( + "NextRaw with scannerContext should not be called in Phoenix environment"); + } + + @Override public void close() throws IOException { + if (!reported) { + LOG.debug(String.format( + "***** PHOENIX-TTL-SCAN-STATS-CLOSE: " + "request-id:[%d] = [%d, %d, %d]", + this.requestId, this.numRowsScanned, this.numRowsExpired, + this.numRowsDeleted)); + reported = true; + } + scanner.close(); + } + + @Override public HRegionInfo getRegionInfo() { + return scanner.getRegionInfo(); + } + + @Override public boolean isFilterDone() throws IOException { + return scanner.isFilterDone(); + } + + @Override public boolean reseek(byte[] row) throws IOException { + return scanner.reseek(row); + } + + @Override public long getMvccReadPoint() { + return scanner.getMvccReadPoint(); + } + + @Override public boolean nextRaw(List<Cell> result) throws IOException { + try { + boolean hasMore; + do { + hasMore = scanner.nextRaw(result); + if (result.isEmpty()) { + break; + } + numRowsScanned++; + if (maskIfExpired && checkRowNotExpired(result)) { + break; + } + + if (deleteIfExpired && deleteRowIfExpired(result)) { + numRowsDeleted++; + break; + } + // skip this row + // 1. if the row has expired (checkRowNotExpired returned false) + // 2. if the row was not deleted (deleteRowIfExpired returned false and + // do not want it to count towards the deleted count) + if (maskIfExpired) { + numRowsExpired++; + } + result.clear(); + } while (hasMore); + return hasMore; + } catch (Throwable t) { + ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t); + return false; // impossible + } + } + + /** + * @param cellList is an input and output parameter and will either include a valid row or be an empty list + * @return true if row expired and deleted or empty, otherwise false + * @throws IOException + */ + private boolean deleteRowIfExpired(List<Cell> cellList) throws IOException { + + long cellListSize = cellList.size(); + if (cellListSize == 0) { + return true; + } + + Iterator<Cell> cellIterator = cellList.iterator(); + Cell firstCell = cellIterator.next(); + byte[] rowKey = new byte[firstCell.getRowLength()]; + System.arraycopy(firstCell.getRowArray(), firstCell.getRowOffset(), rowKey, 0, + firstCell.getRowLength()); + + boolean isRowExpired = !checkRowNotExpired(cellList); + if (isRowExpired) { + long ttl = ScanUtil.getPhoenixTTL(this.scan); + long ts = getMaxTimestamp(cellList); + LOG.debug(String.format( + "** PHOENIX-TTL: Deleting region = %s, row = %s, delete-ts = %d, max-ts = %d ** ", + region.getRegionInfo().getTable().getNameAsString(), Bytes.toString(rowKey), + now - ttl, ts)); + Delete del = new Delete(rowKey, now - ttl); + Mutation[] mutations = new Mutation[] { del }; + region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); + return true; + } + return false; + } + + private boolean isEmptyColumn(Cell cell) { + return Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), emptyCF, 0, emptyCF.length) == 0 && + Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), emptyCQ, 0, emptyCQ.length) == 0; + } + + // TODO : Remove it after we verify all SQLs include the empty column. + // Before we added ScanUtil.addEmptyColumnToScan + // some queries like select count(*) did not include the empty column in scan, + // thus this method was the fallback in those cases. + private boolean checkEmptyColumnNotExpired(byte[] rowKey) throws IOException { + LOG.warn("Scan " + scan + " did not return the empty column for " + region + .getRegionInfo().getTable().getNameAsString()); + Get get = new Get(rowKey); + get.setTimeRange(minTimestamp, maxTimestamp); + get.addColumn(emptyCF, emptyCQ); + Result result = region.get(get); + if (result.isEmpty()) { + LOG.warn("The empty column does not exist in a row in " + region.getRegionInfo() + .getTable().getNameAsString()); + return false; + } + return !isTTLExpired(result.getColumnLatestCell(emptyCF, emptyCQ)); + } + + /** + * @param cellList is an input and output parameter and will either include a valid row + * or be an empty list + * @return true if row not expired, otherwise false + * @throws IOException + */ + private boolean checkRowNotExpired(List<Cell> cellList) throws IOException { Review comment: @jpisaac can we please add these tests? ########## File path: phoenix-core/src/test/java/org/apache/phoenix/util/ScanUtilTest.java ########## @@ -466,4 +482,78 @@ public SortOrder getSortOrder() { assertArrayEquals(expectedEndKey, endKey); } } + + public static class PhoenixTTLScanUtilTest extends BaseConnectionlessQueryTest { Review comment: Nice tests @jpisaac ! ########## File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/metrics/MetricsPhoenixCoprocessorSourceFactory.java ########## @@ -0,0 +1,20 @@ +package org.apache.phoenix.coprocessor.metrics; + Review comment: @jpisaac can we address this please? ########## File path: phoenix-core/src/main/java/org/apache/phoenix/coprocessor/metrics/MetricsPhoenixCoprocessorSourceFactory.java ########## @@ -0,0 +1,20 @@ +package org.apache.phoenix.coprocessor.metrics; + +public class MetricsPhoenixCoprocessorSourceFactory { + + private static final MetricsPhoenixCoprocessorSourceFactory + INSTANCE = new MetricsPhoenixCoprocessorSourceFactory(); + private MetricsPhoenixTTLSource phoenixTTLSource; + + public static MetricsPhoenixCoprocessorSourceFactory getInstance() { + return INSTANCE; + } + + public synchronized MetricsPhoenixTTLSource getPhoenixTTLSource() { Review comment: @jpisaac can we address this please? ########## File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java ########## @@ -608,4 +635,1353 @@ private SchemaBuilder createLevel1TenantView(TenantViewOptions tenantViewOptions assertSyscatHavePhoenixTTLRelatedColumns(tenantId, schemaName, indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 300000); } + + + @Test public void testWithTenantViewAndNoGlobalView() throws Exception { + // PHOENIX TTL is set in seconds (for e.g 10 secs) + long phoenixTTL = 10; + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.getTableColumns().clear(); + tableOptions.getTableColumnTypes().clear(); + + TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults(); + tenantViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL)); + + // Define the test schema. + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder.withTableOptions(tableOptions).withTenantViewOptions(tenantViewOptions) + .build(); + + // Define the test data. + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String zid = String.format("00A0y000%07d", rowIndex); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + return Lists.newArrayList(new Object[] { zid, col7, col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + DataReader dataReader = new BasicDataReader(); + + List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("ZID"); + String + tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + dataReader.setValidationColumns(columns); + dataReader.setRowKeyColumns(rowKeyColumns); + dataReader.setDML(String.format("SELECT %s from %s", Joiner.on(",").join(columns), + schemaBuilder.getEntityTenantViewName())); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + upsertDataAndRunValidations(phoenixTTL, DEFAULT_NUM_ROWS, dataWriter, dataReader, + schemaBuilder); + } + } + + @Test public void testWithSQLUsingIndexWithCoveredColsUpdates() throws Exception { + + // PHOENIX TTL is set in seconds (for e.g 10 secs) + long phoenixTTL = 10; + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.getTableColumns().clear(); + tableOptions.getTableColumnTypes().clear(); + + GlobalViewOptions globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults(); + globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL)); + + GlobalViewIndexOptions + globalViewIndexOptions = + SchemaBuilder.GlobalViewIndexOptions.withDefaults(); + globalViewIndexOptions.setLocal(false); + + TenantViewOptions tenantViewOptions = new TenantViewOptions(); + tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); + tenantViewOptions + .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR")); + + OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions(); + testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault"); + testCaseWhenAllCFMatchAndAllDefault + .setTableCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setGlobalViewCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null)); + + // Define the test schema. + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewOptions) + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build(); + + // Define the test data. + final List<String> outerCol4s = Lists.newArrayList(); + DataSupplier dataSupplier = new DataSupplier() { + String col4ForWhereClause; + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String id = String.format("00A0y000%07d", rowIndex); + String zid = String.format("00B0y000%07d", rowIndex); + String col4 = String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + // Store the col4 data to be used later in a where clause + outerCol4s.add(col4); + String col5 = String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col6 = String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists + .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + DataReader dataReader = new BasicDataReader(); + + List<String> columns = + Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("COL6"); + String tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Upsert data for validation + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + dataReader.setValidationColumns(rowKeyColumns); + dataReader.setRowKeyColumns(rowKeyColumns); + dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'", + schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1))); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + } + } + + /** + * Ensure/validate that empty columns for the index are still updated even when covered columns + * are not updated. + * + * @throws Exception + */ + @Test public void testWithSQLUsingIndexAndNoCoveredColsUpdates() throws Exception { + + // PHOENIX TTL is set in seconds (for e.g 10 secs) + long phoenixTTL = 10; + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.getTableColumns().clear(); + tableOptions.getTableColumnTypes().clear(); + + GlobalViewOptions globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults(); + globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL)); + + GlobalViewIndexOptions globalViewIndexOptions = + SchemaBuilder.GlobalViewIndexOptions.withDefaults(); + globalViewIndexOptions.setLocal(false); + + TenantViewOptions tenantViewOptions = new TenantViewOptions(); + tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); + tenantViewOptions + .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR")); + + OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions(); + testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault"); + testCaseWhenAllCFMatchAndAllDefault + .setTableCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setGlobalViewCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null)); + + // Define the test schema. + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewOptions) + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build(); + + // Define the test data. + final List<String> outerCol4s = Lists.newArrayList(); + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String id = String.format("00A0y000%07d", rowIndex); + String zid = String.format("00B0y000%07d", rowIndex); + String col4 = String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + // Store the col4 data to be used later in a where clause + outerCol4s.add(col4); + String col5 = String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col6 = String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists + .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + DataReader dataReader = new BasicDataReader(); + + List<String> columns = + Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9"); + List<String> nonCoveredColumns = + Lists.newArrayList("ID", "ZID", "COL5", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("COL6"); + String tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Upsert data for validation + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + dataReader.setValidationColumns(rowKeyColumns); + dataReader.setRowKeyColumns(rowKeyColumns); + dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'", + schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1))); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + + // Now update the above data but not modifying the covered columns. + // Ensure/validate that empty columns for the index are still updated. + + // Data supplier where covered and included (col4 and col6) columns are not updated. + DataSupplier dataSupplierForNonCoveredColumns = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String id = String.format("00A0y000%07d", rowIndex); + String zid = String.format("00B0y000%07d", rowIndex); + String col5 = String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists.newArrayList(new Object[] { id, zid, col5, col7, col8, col9 }); + } + }; + + // Upsert data for validation with non covered columns + dataWriter.setDataSupplier(dataSupplierForNonCoveredColumns); + dataWriter.setUpsertColumns(nonCoveredColumns); + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + List<String> rowKeyColumns1 = Lists.newArrayList("ID", "COL6"); + dataReader.setValidationColumns(rowKeyColumns1); + dataReader.setRowKeyColumns(rowKeyColumns1); + dataReader.setDML(String.format("SELECT id, col6 from %s where col4 = '%s'", + schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1))); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + + } + } + + + /** + * Ensure/validate that correct parent's phoenix ttl value is used when queries are using the + * index. + * + * @throws Exception + */ + @Test public void testWithSQLUsingIndexAndMultiLevelViews() throws Exception { + + // PHOENIX TTL is set in seconds (for e.g 10 secs) + long phoenixTTL = 10; + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.getTableColumns().clear(); + tableOptions.getTableColumnTypes().clear(); + + GlobalViewOptions globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults(); + globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL)); + + GlobalViewIndexOptions globalViewIndexOptions = + SchemaBuilder.GlobalViewIndexOptions.withDefaults(); + globalViewIndexOptions.setLocal(false); + + TenantViewOptions tenantViewOptions = new TenantViewOptions(); + tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); + tenantViewOptions + .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR")); + + + OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions(); + testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault"); + testCaseWhenAllCFMatchAndAllDefault + .setTableCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setGlobalViewCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null)); + + // Define the test schema. + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewOptions) + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build(); + + + String level3ViewName = String.format("%s.%s", + DEFAULT_SCHEMA_NAME, "E11"); + String level3ViewCreateSQL = String.format("CREATE VIEW IF NOT EXISTS %s AS SELECT * FROM %s", + level3ViewName, + schemaBuilder.getEntityTenantViewName()); + String tConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + try (Connection tConnection = DriverManager.getConnection(tConnectUrl)) { + tConnection.createStatement().execute(level3ViewCreateSQL); + } + + + // Define the test data. + final List<String> outerCol4s = Lists.newArrayList(); + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String id = String.format("00A0y000%07d", rowIndex); + String zid = String.format("00B0y000%07d", rowIndex); + String col4 = String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + // Store the col4 data to be used later in a where clause + outerCol4s.add(col4); + String col5 = String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col6 = String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists + .newArrayList(new Object[] { id, zid, col4, col5, col6, col7, col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + DataReader dataReader = new BasicDataReader(); + + List<String> columns = + Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9"); + List<String> nonCoveredColumns = + Lists.newArrayList("ID", "ZID", "COL5", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("COL6"); + String tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(level3ViewName); + + // Upsert data for validation + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + dataReader.setValidationColumns(rowKeyColumns); + dataReader.setRowKeyColumns(rowKeyColumns); + dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'", + level3ViewName, outerCol4s.get(1))); + dataReader.setTargetEntity(level3ViewName); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + + // Now update the above data but not modifying the covered columns. + // Ensure/validate that empty columns for the index are still updated. + + // Data supplier where covered and included (col4 and col6) columns are not updated. + DataSupplier dataSupplierForNonCoveredColumns = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String id = String.format("00A0y000%07d", rowIndex); + String zid = String.format("00B0y000%07d", rowIndex); + String col5 = String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists.newArrayList(new Object[] { id, zid, col5, col7, col8, col9 }); + } + }; + + // Upsert data for validation with non covered columns + dataWriter.setDataSupplier(dataSupplierForNonCoveredColumns); + dataWriter.setUpsertColumns(nonCoveredColumns); + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + List<String> rowKeyColumns1 = Lists.newArrayList("ID", "COL6"); + dataReader.setValidationColumns(rowKeyColumns1); + dataReader.setRowKeyColumns(rowKeyColumns1); + dataReader.setDML(String.format("SELECT id, col6 from %s where col4 = '%s'", + level3ViewName, outerCol4s.get(1))); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + + } + } + + @Test public void testWithVariousSQLs() throws Exception { + + // PHOENIX TTL is set in seconds (for e.g 10 secs) + long phoenixTTL = 10; + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.getTableColumns().clear(); + tableOptions.getTableColumnTypes().clear(); + + GlobalViewOptions globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults(); + globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL)); + + GlobalViewIndexOptions globalViewIndexOptions = + SchemaBuilder.GlobalViewIndexOptions.withDefaults(); + globalViewIndexOptions.setLocal(false); + + TenantViewOptions tenantViewOptions = new TenantViewOptions(); + tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); + tenantViewOptions.setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR")); + + OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions(); + testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault"); + testCaseWhenAllCFMatchAndAllDefault + .setTableCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setGlobalViewCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null)); + + // Define the test schema. + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewOptions) + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).build(); + + // Define the test data. + final String groupById = String.format("00A0y000%07d", 0); + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String zid = String.format("00B0y000%07d", rowIndex); + String col4 = String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col5 = String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col6 = String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists.newArrayList( + new Object[] { groupById, zid, col4, col5, col6, col7, col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + List<String> columns = + Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID"); + String tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + // Case : count(1) sql + DataReader dataReader = new BasicDataReader(); + dataReader.setValidationColumns(Arrays.asList("num_rows")); + dataReader.setRowKeyColumns(Arrays.asList("num_rows")); + dataReader.setDML(String + .format("SELECT count(1) as num_rows from %s HAVING count(1) > 0", + schemaBuilder.getEntityTenantViewName())); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + + // Case : group by sql + dataReader.setValidationColumns(Arrays.asList("num_rows")); + dataReader.setRowKeyColumns(Arrays.asList("num_rows")); + dataReader.setDML(String + .format("SELECT count(1) as num_rows from %s GROUP BY ID HAVING count(1) > 0", + schemaBuilder.getEntityTenantViewName(), groupById)); + + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + } + } + + @Test public void testWithVariousSQLsForMultipleTenants() throws Exception { + + // PHOENIX TTL is set in seconds (for e.g 10 secs) + long phoenixTTL = 10; + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.getTableColumns().clear(); + tableOptions.getTableColumnTypes().clear(); + + GlobalViewOptions globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults(); + globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL)); + + GlobalViewIndexOptions globalViewIndexOptions = + SchemaBuilder.GlobalViewIndexOptions.withDefaults(); + globalViewIndexOptions.setLocal(false); + + TenantViewOptions tenantViewOptions = new TenantViewOptions(); + tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); + tenantViewOptions + .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR")); + + OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions(); + testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault"); + testCaseWhenAllCFMatchAndAllDefault + .setTableCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setGlobalViewCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null)); + + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewOptions) + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault); + + for (int tenant : Arrays.asList(new Integer[] { 1, 2, 3 })) { + // build schema for tenant + schemaBuilder.buildWithNewTenant(); + + // Define the test data. + final String groupById = String.format("00A0y000%07d", 0); + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String zid = String.format("00B0y000%07d", rowIndex); + String col4 = String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col5 = String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col6 = String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists.newArrayList( + new Object[] { groupById, zid, col4, col5, col6, col7, col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + List<String> columns = + Lists.newArrayList("ID", "ZID", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID"); + String tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + // Case : count(1) sql + DataReader dataReader = new BasicDataReader(); + dataReader.setValidationColumns(Arrays.asList("num_rows")); + dataReader.setRowKeyColumns(Arrays.asList("num_rows")); + dataReader.setDML(String + .format("SELECT count(1) as num_rows from %s HAVING count(1) > 0", + schemaBuilder.getEntityTenantViewName())); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + + // Case : group by sql + dataReader.setValidationColumns(Arrays.asList("num_rows")); + dataReader.setRowKeyColumns(Arrays.asList("num_rows")); + dataReader.setDML(String + .format("SELECT count(1) as num_rows from %s GROUP BY ID HAVING count(1) > 0", + schemaBuilder.getEntityTenantViewName())); + + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + } + } + } + + @Test public void testWithVariousSQLsForMultipleViews() throws Exception { + + // PHOENIX TTL is set in seconds (for e.g 10 secs) + long phoenixTTL = 10; + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.getTableColumns().clear(); + tableOptions.getTableColumnTypes().clear(); + + TenantViewOptions tenantViewOptions = new TenantViewOptions(); + tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); + tenantViewOptions + .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR")); + + tenantViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL)); + + OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions(); + testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault"); + testCaseWhenAllCFMatchAndAllDefault + .setTableCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setGlobalViewCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null)); + + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder.withTableOptions(tableOptions).withTenantViewOptions(tenantViewOptions) + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault); + + for (int view : Arrays.asList(new Integer[] { 1, 2, 3 })) { + // build schema for new view + schemaBuilder.buildNewView(); + + // Define the test data. + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String zid = String.format("00B0y000%07d", rowIndex); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists.newArrayList(new Object[] { zid, col7, col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("ZID"); + String tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + // Case : count(1) sql + DataReader dataReader = new BasicDataReader(); + dataReader.setValidationColumns(Arrays.asList("num_rows")); + dataReader.setRowKeyColumns(Arrays.asList("num_rows")); + dataReader.setDML(String + .format("SELECT count(1) as num_rows from %s HAVING count(1) > 0", + schemaBuilder.getEntityTenantViewName())); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + + // Case : group by sql + dataReader.setValidationColumns(Arrays.asList("num_rows")); + dataReader.setRowKeyColumns(Arrays.asList("num_rows")); + dataReader.setDML(String + .format("SELECT count(1) as num_rows from %s GROUP BY ZID HAVING count(1) > 0", + schemaBuilder.getEntityTenantViewName())); + + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(phoenixTTL, dataReader, schemaBuilder); + } + } + } + + @Test public void testWithTenantViewAndGlobalViewAndVariousOptions() throws Exception { + // PHOENIX TTL is set in seconds (for e.g 10 secs) + long phoenixTTL = 10; + + // Define the test schema + TableOptions tableOptions = TableOptions.withDefaults(); + + GlobalViewOptions globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults(); + globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL)); + + GlobalViewIndexOptions globalViewIndexOptions = GlobalViewIndexOptions.withDefaults(); + + TenantViewOptions tenantViewOptions = new TenantViewOptions(); + tenantViewOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); + tenantViewOptions + .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR")); + + TenantViewIndexOptions tenantViewIndexOptions = TenantViewIndexOptions.withDefaults(); + + OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions(); + testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault"); + testCaseWhenAllCFMatchAndAllDefault + .setTableCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setGlobalViewCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null)); + + for (String additionalProps : Lists + .newArrayList("COLUMN_ENCODED_BYTES=0", "DEFAULT_COLUMN_FAMILY='0'")) { + + StringBuilder withTableProps = new StringBuilder(); + withTableProps.append("MULTI_TENANT=true,").append(additionalProps); + + for (boolean isGlobalViewLocal : Lists.newArrayList(true, false)) { + for (boolean isTenantViewLocal : Lists.newArrayList(true, false)) { + + tableOptions.setTableProps(withTableProps.toString()); + globalViewIndexOptions.setLocal(isGlobalViewLocal); + tenantViewIndexOptions.setLocal(isTenantViewLocal); + OtherOptions otherOptions = testCaseWhenAllCFMatchAndAllDefault; + + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder.withTableOptions(tableOptions) + .withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewOptions) + .withTenantViewIndexOptions(tenantViewIndexOptions) + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault) + .buildWithNewTenant(); + + // Define the test data. + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String id = String.format("00A0y000%07d", rowIndex); + String zid = String.format("00B0y000%07d", rowIndex); + String col1 = String.format("a%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col2 = String.format("b%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col3 = String.format("c%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col4 = String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col5 = String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col6 = String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists.newArrayList( + new Object[] { id, zid, col1, col2, col3, col4, col5, col6, + col7, col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + DataReader dataReader = new BasicDataReader(); + + List<String> columns = + Lists.newArrayList("ID", "ZID", + "COL1", "COL2", "COL3", "COL4", "COL5", + "COL6", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID"); + String tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + + schemaBuilder.getDataOptions().getTenantId(); + try (Connection writeConnection = DriverManager + .getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + dataReader.setValidationColumns(columns); + dataReader.setRowKeyColumns(rowKeyColumns); + dataReader.setDML(String + .format("SELECT %s from %s", Joiner.on(",").join(columns), + schemaBuilder.getEntityTenantViewName())); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + upsertDataAndRunValidations(phoenixTTL, DEFAULT_NUM_ROWS, dataWriter, + dataReader, schemaBuilder); + } + + long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis() + + (phoenixTTL * 1000); + // Delete expired rows. + deleteData(schemaBuilder, scnTimestamp); + + // Verify after deleting TTL expired data. + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(scnTimestamp)); + + try (Connection readConnection = DriverManager + .getConnection(tenantConnectUrl, props)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> + fetchedData = + fetchData(dataReader); + assertTrue("Deleted rows should not be fetched", + fetchedData.rowKeySet().size() == 0); + } + } + } + } + } + + /** + * ************************************************************ + * Case 1: Build schema with TTL set by the tenant view. + * TTL for GLOBAL_VIEW - 300000ms (not set) + * TTL for TENANT_VIEW - 300000ms + * ************************************************************ + */ + + @Test public void testGlobalAndTenantViewTTLInheritance1() throws Exception { + // PHOENIX TTL is set in seconds (for e.g 200 secs) + long tenantPhoenixTTL = 200; + + // Define the test schema. + // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP) + // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => (ID) + // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID) + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true"); + + SchemaBuilder.GlobalViewOptions globalViewOptions = + SchemaBuilder.GlobalViewOptions.withDefaults(); + + SchemaBuilder.GlobalViewIndexOptions globalViewIndexOptions = + SchemaBuilder.GlobalViewIndexOptions.withDefaults(); + globalViewIndexOptions.setLocal(false); + + TenantViewOptions tenantViewWithOverrideOptions = new TenantViewOptions(); + tenantViewWithOverrideOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); + tenantViewWithOverrideOptions + .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR")); + tenantViewWithOverrideOptions.setTableProps(String.format("PHOENIX_TTL=%d", tenantPhoenixTTL)); + + OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions(); + testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault"); + testCaseWhenAllCFMatchAndAllDefault + .setTableCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setGlobalViewCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null)); + + /** + * ************************************************************ + * Case 1: Build schema with TTL set by the tenant view. + * TTL for GLOBAL_VIEW - 300000ms (not set) + * TTL for TENANT_VIEW - 30000ms + * ************************************************************ + */ + + schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewWithOverrideOptions).withTenantViewIndexDefaults() + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).buildWithNewTenant(); + + // Define the test data. + final String id = String.format("00A0y000%07d", 0); + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String zid = String.format("00B0y000%07d", rowIndex); + String col1 = String.format("a%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col2 = String.format("b%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col3 = String.format("c%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col4 = String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col5 = String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col6 = String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists.newArrayList( + new Object[] { id, zid, col1, col2, col3, col4, col5, col6, col7, col8, + col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + List<String> columns = + Lists.newArrayList("ID", "ZID", + "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", + "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID"); + String tenant1ConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + try (Connection writeConnection = DriverManager.getConnection(tenant1ConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + // Case : count(1) sql + DataReader dataReader = new BasicDataReader(); + dataReader.setValidationColumns(Arrays.asList("num_rows")); + dataReader.setRowKeyColumns(Arrays.asList("num_rows")); + dataReader.setDML(String + .format("SELECT count(1) as num_rows from %s HAVING count(1) > 0", + schemaBuilder.getEntityTenantViewName())); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data exists before ttl expiration. + long probeTimestamp = EnvironmentEdgeManager.currentTimeMillis() + + ((tenantPhoenixTTL * 1000) / 2); + validateRowsAreNotMaskedUsingCounts(probeTimestamp, dataReader, schemaBuilder); + // Validate data before and after ttl expiration. + // Use the tenant phoenix ttl since that is what the view has set. + validateExpiredRowsAreNotReturnedUsingCounts(tenantPhoenixTTL, dataReader, schemaBuilder); + } + } + + /** + * ************************************************************ + * Case 2: Build schema with TTL set by the global view. + * TTL for GLOBAL_VIEW - 300000ms + * TTL for TENANT_VIEW - 300000ms (not set, uses global view ttl) + * ************************************************************ + */ + + @Test public void testGlobalAndTenantViewTTLInheritance2() throws Exception { + // PHOENIX TTL is set in seconds (for e.g 300 secs) + long globalPhoenixTTL = 300; + + // Define the test schema. + // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP) + // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => (ID) + // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID) + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true"); + + SchemaBuilder.GlobalViewOptions globalViewOptions = + SchemaBuilder.GlobalViewOptions.withDefaults(); + globalViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", globalPhoenixTTL)); + + SchemaBuilder.GlobalViewIndexOptions globalViewIndexOptions = + SchemaBuilder.GlobalViewIndexOptions.withDefaults(); + globalViewIndexOptions.setLocal(false); + + TenantViewOptions tenantViewWithOverrideOptions = new TenantViewOptions(); + tenantViewWithOverrideOptions.setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); + tenantViewWithOverrideOptions + .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR")); + + OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions(); + testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault"); + testCaseWhenAllCFMatchAndAllDefault + .setTableCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setGlobalViewCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setTenantViewCFs(Lists.newArrayList((String) null, null, null, null)); + + /** + * ************************************************************ + * Case 2: Build schema with TTL set by the global view. + * TTL for GLOBAL_VIEW - 300000ms + * TTL for TENANT_VIEW - 300000ms (not set, uses global view ttl) + * ************************************************************ + */ + + schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewWithOverrideOptions).withTenantViewIndexDefaults() + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault).buildWithNewTenant(); + + // Define the test data. + final String id = String.format("00A0y000%07d", 0); + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String zid = String.format("00B0y000%07d", rowIndex); + String col1 = String.format("a%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col2 = String.format("b%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col3 = String.format("c%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col4 = String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col5 = String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col6 = String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists.newArrayList( + new Object[] { id, zid, col1, col2, col3, col4, col5, col6, col7, col8, + col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + List<String> columns = + Lists.newArrayList("ID", "ZID", + "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", + "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID"); + String tenant1ConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + try (Connection writeConnection = DriverManager.getConnection(tenant1ConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + // Case : count(1) sql + DataReader dataReader = new BasicDataReader(); + dataReader.setValidationColumns(Arrays.asList("num_rows")); + dataReader.setRowKeyColumns(Arrays.asList("num_rows")); + dataReader.setDML(String + .format("SELECT count(1) as num_rows from %s HAVING count(1) > 0", + schemaBuilder.getEntityTenantViewName())); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data exists before ttl expiration. + long probeTimestamp = EnvironmentEdgeManager.currentTimeMillis() + + ((globalPhoenixTTL * 1000) / 2); + validateRowsAreNotMaskedUsingCounts(probeTimestamp, dataReader, schemaBuilder); + // Validate data before and after ttl expiration. + // Use the global phoenix ttl since that is what the view has inherited. + validateExpiredRowsAreNotReturnedUsingCounts(globalPhoenixTTL, dataReader, schemaBuilder); + } + } + + @Test public void testDeleteIfExpiredOnTenantView() throws Exception { + + // PHOENIX TTL is set in seconds (for e.g 10 secs) + long phoenixTTL = 10; + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.getTableColumns().clear(); + tableOptions.getTableColumnTypes().clear(); + + TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults(); + tenantViewOptions.setTableProps(String.format("PHOENIX_TTL=%d", phoenixTTL)); + + // Define the test schema. + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder.withTableOptions(tableOptions).withTenantViewOptions(tenantViewOptions) + .build(); + + // Define the test data. + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String zid = String.format("00A0y000%07d", rowIndex); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + return Lists.newArrayList(new Object[] { zid, col7, col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + DataReader dataReader = new BasicDataReader(); + + List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("ZID"); + String tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + dataReader.setValidationColumns(columns); + dataReader.setRowKeyColumns(rowKeyColumns); + dataReader.setDML(String.format("SELECT %s from %s", Joiner.on(",").join(columns), + schemaBuilder.getEntityTenantViewName())); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + upsertDataAndRunValidations(phoenixTTL, DEFAULT_NUM_ROWS, dataWriter, dataReader, + schemaBuilder); + } + + long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis() + (phoenixTTL * 1000); + // Delete expired rows. + deleteData(schemaBuilder, scnTimestamp); + + // Verify after deleting TTL expired data. + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(scnTimestamp)); + + try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> + fetchedData = + fetchData(dataReader); + assertTrue("Deleted rows should not be fetched", fetchedData.rowKeySet().size() == 0); + } + } + + private void upsertDataAndRunValidations(long phoenixTTL, int numRowsToUpsert, + DataWriter dataWriter, DataReader dataReader, SchemaBuilder schemaBuilder) + throws IOException, SQLException { + + //Insert for the first time and validate them. + validateExpiredRowsAreNotReturnedUsingData(phoenixTTL, upsertData(dataWriter, numRowsToUpsert), + dataReader, schemaBuilder); + + // Update the above rows and validate the same. + validateExpiredRowsAreNotReturnedUsingData(phoenixTTL, upsertData(dataWriter, numRowsToUpsert), + dataReader, schemaBuilder); + + } + + private void validateExpiredRowsAreNotReturnedUsingCounts(long phoenixTTL, DataReader dataReader, + SchemaBuilder schemaBuilder) throws SQLException { + + String tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + + // Verify before TTL expiration + try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> + fetchedData = + fetchData(dataReader); + assertTrue("Fetched data should not be null", fetchedData != null); + assertTrue("Rows should exists before expiration", fetchedData.rowKeySet().size() > 0); + } + + // Verify after TTL expiration + long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis(); + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(scnTimestamp + (2 * phoenixTTL * 1000))); + try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> + fetchedData = + fetchData(dataReader); + assertTrue("Fetched data should not be null", fetchedData != null); + assertTrue("Expired rows should not be fetched", fetchedData.rowKeySet().size() == 0); + } + } + + private void validateExpiredRowsAreNotReturnedUsingData(long phoenixTTL, + com.google.common.collect.Table<String, String, Object> upsertedData, + DataReader dataReader, SchemaBuilder schemaBuilder) throws SQLException { + + String tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + + // Verify before TTL expiration + long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis(); + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(scnTimestamp)); + try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> + fetchedData = + fetchData(dataReader); + assertTrue("Upserted data should not be null", upsertedData != null); + assertTrue("Fetched data should not be null", fetchedData != null); + + verifyRowsBeforeTTLExpiration(upsertedData, fetchedData); + } + + // Verify after TTL expiration + props.setProperty("CurrentSCN", Long.toString(scnTimestamp + (2 * phoenixTTL * 1000))); + try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> + fetchedData = + fetchData(dataReader); + assertTrue("Fetched data should not be null", fetchedData != null); + assertTrue("Expired rows should not be fetched", fetchedData.rowKeySet().size() == 0); + } + + } + + private void validateRowsAreNotMaskedUsingCounts(long probeTimestamp, DataReader dataReader, + SchemaBuilder schemaBuilder) throws SQLException { + + String tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions() + .getTenantId(); + + // Verify rows exists (not masked) at current time + long scnTimestamp = EnvironmentEdgeManager.currentTimeMillis(); + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(scnTimestamp )); + try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> + fetchedData = + fetchData(dataReader); + assertTrue("Fetched data should not be null", fetchedData != null); + assertTrue("Rows should exists before ttl expiration (now)", + fetchedData.rowKeySet().size() > 0); + } + + // Verify rows exists (not masked) at probed timestamp + props.setProperty("CurrentSCN", Long.toString(probeTimestamp)); + try (Connection readConnection = DriverManager.getConnection(tenantConnectUrl, props)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> + fetchedData = + fetchData(dataReader); + assertTrue("Fetched data should not be null", fetchedData != null); + assertTrue("Rows should exists before ttl expiration (probe-timestamp)", + fetchedData.rowKeySet().size() > 0); + } + } + + private void verifyRowsBeforeTTLExpiration( + com.google.common.collect.Table<String, String, Object> upsertedData, + com.google.common.collect.Table<String, String, Object> fetchedData) { + + Set<String> upsertedRowKeys = upsertedData.rowKeySet(); + Set<String> fetchedRowKeys = fetchedData.rowKeySet(); + assertTrue("Upserted row keys should not be null", upsertedRowKeys != null); + assertTrue("Fetched row keys should not be null", fetchedRowKeys != null); + assertTrue(String.format("Rows upserted and fetched do not match, upserted=%d, fetched=%d", + upsertedRowKeys.size(), fetchedRowKeys.size()), + upsertedRowKeys.equals(fetchedRowKeys)); + + Set<String> fetchedCols = fetchedData.columnKeySet(); + for (String rowKey : fetchedRowKeys) { + for (String columnKey : fetchedCols) { + Object upsertedValue = upsertedData.get(rowKey, columnKey); + Object fetchedValue = fetchedData.get(rowKey, columnKey); + assertTrue("Upserted values should not be null", upsertedValue != null); + assertTrue("Fetched values should not be null", fetchedValue != null); + assertTrue("Values upserted and fetched do not match", + upsertedValue.equals(fetchedValue)); + } + } + } + + private com.google.common.collect.Table<String, String, Object> upsertData( + DataWriter dataWriter, int numRowsToUpsert) throws SQLException { + // Upsert rows + dataWriter.upsertRows(1, numRowsToUpsert); + return dataWriter.getDataTable(); + } + + private com.google.common.collect.Table<String, String, Object> fetchData(DataReader dataReader) + throws SQLException { + + dataReader.readRows(); + return dataReader.getDataTable(); + } + + private void deleteData(SchemaBuilder schemaBuilder, long scnTimestamp) throws SQLException { + + String viewName = schemaBuilder.getEntityTenantViewName(); + + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(scnTimestamp)); + String tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions().getTenantId(); + + try (Connection deleteConnection = DriverManager.getConnection(tenantConnectUrl, props); + final Statement statement = deleteConnection.createStatement()) { + deleteConnection.setAutoCommit(true); + + final String deleteIfExpiredStatement = String.format("select * from %s", viewName); + Preconditions.checkNotNull(deleteIfExpiredStatement); + + final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); + // Optimize the query plan so that we potentially use secondary indexes + final QueryPlan queryPlan = pstmt.optimizeQuery(deleteIfExpiredStatement); + final Scan scan = queryPlan.getContext().getScan(); + + PTable table = PhoenixRuntime + .getTable(deleteConnection, schemaBuilder.getDataOptions().getTenantId(), viewName); + + byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(table); + byte[] emptyColumnName = + table.getEncodingScheme() == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ? + QueryConstants.EMPTY_COLUMN_BYTES : + table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME); + + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName); + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName); + scan.setAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES); + scan.setAttribute(BaseScannerRegionObserver.MASK_PHOENIX_TTL_EXPIRED, PDataType.FALSE_BYTES); + scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL, Bytes.toBytes(Long.valueOf(table.getPhoenixTTL()))); + PhoenixResultSet + rs = pstmt.newResultSet(queryPlan.iterator(), queryPlan.getProjector(), queryPlan.getContext()); Review comment: @jpisaac can you address this comment please? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
