jpisaac commented on a change in pull request #766:
URL: https://github.com/apache/phoenix/pull/766#discussion_r413137856



##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
##########
@@ -391,7 +402,1390 @@ public void testViewTTLWithAlterView() throws Exception {
         // Since the VIEW_TTL property values are not being overriden, we 
expect the TTL value to be different from the global view.
         assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, 
tenantViewName, PTableType.VIEW.getSerializedValue(), 1000);
         assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, 
indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 1000);
+    }
+
+    @Test
+    public void testViewTTLForLevelTwoViewWithNoIndexes() throws Exception {
+        long startTime = System.currentTimeMillis();
+
+        // Define the test schema.
+        // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK 
=> (ORG_ID, KP)
+        // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => 
(ID)
+        // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => 
(ZID)
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+        SchemaBuilder.GlobalViewOptions
+                globalViewOptions = 
SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps("VIEW_TTL=300000");
+
+        TenantViewOptions tenantViewWithOverrideOptions = 
TenantViewOptions.withDefaults();
+        tenantViewWithOverrideOptions.setTableProps("VIEW_TTL=10000");
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withTenantViewOptions(tenantViewWithOverrideOptions)
+                .buildWithNewTenant();
+
+        String tenantId = schemaBuilder.getDataOptions().getTenantId();
+        String schemaName = 
stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String globalViewName = 
stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName()));
+        String tenantViewName = 
stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+
+        // Expected 2 rows - one for GlobalView, one for TenantView  each.
+        // Since the VIEW_TTL property values are being set, we expect the 
view header columns to show up in regular scans too.
+        
assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(),
 startTime, false, 2);
+        assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName, 
PTableType.VIEW.getSerializedValue(), 300000);
+        // Since the VIEW_TTL property values are not being overriden, we 
expect the TTL value to be different from the global view.
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, 
tenantViewName, PTableType.VIEW.getSerializedValue(), 10000);
+
+        // Without override
+        startTime = System.currentTimeMillis();
+
+        TenantViewOptions tenantViewWithoutOverrideOptions = 
TenantViewOptions.withDefaults();
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withTenantViewOptions(tenantViewWithoutOverrideOptions)
+                .buildWithNewTenant();
+
+        tenantId = schemaBuilder.getDataOptions().getTenantId();
+        schemaName = 
stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        globalViewName = 
stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName()));
+        tenantViewName = 
stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+
+        // Expected 1 rows - one for TenantView each.
+        // Since the VIEW_TTL property values are being set, we expect the 
view header columns to show up in regular scans too.
+        
assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(),
 startTime, false, 1);
+        assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName, 
PTableType.VIEW.getSerializedValue(), 300000);
+        // Since the VIEW_TTL property values are not being overriden, we 
expect the TTL value to be same as the global view.
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, 
tenantViewName, PTableType.VIEW.getSerializedValue(), 300000);
+    }
+
+    @Test
+    public void testWithTenantViewAndNoGlobalView() throws Exception {
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .build();
+
+        // Define the test data.
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String zid = String.format("00A0y000%07d", rowIndex);
+                String col7 = String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String col8 = String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String col9 = String.format("i%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                return Lists.newArrayList(new Object[] { zid, col7, col8, col9 
});
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        DataReader dataReader = new BasicDataReader();
+
+        List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8", 
"COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ZID");
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + 
schemaBuilder.getDataOptions()
+                        .getTenantId();
+        try (Connection writeConnection = DriverManager
+                .getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            dataReader.setValidationColumns(columns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT %s from %s",
+                    Joiner.on(",").join(columns),
+                    schemaBuilder.getEntityTenantViewName()));
+            
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            upsertDataAndRunValidations(viewTTL, DEFAULT_NUM_ROWS, dataWriter,
+                    dataReader, schemaBuilder);
+        }
+    }
+
+    @Test public void testWithSQLUsingIndexWithCoveredColsUpdates() throws 
Exception {
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions
+                globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        GlobalViewIndexOptions
+                globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", 
"VARCHAR", "VARCHAR"));
+
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, 
null));
+        
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
 null, null, null));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+                .build();
+
+
+        // Define the test data.
+        final List<String> outerCol4s = Lists.newArrayList();
+        DataSupplier dataSupplier = new DataSupplier() {
+            String col4ForWhereClause;
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String id = String.format("00A0y000%07d", rowIndex);
+                String zid = String.format("00B0y000%07d", rowIndex);
+                String
+                        col4 =
+                        String.format("d%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                // Store the col4 data to be used later in a where clause
+                outerCol4s.add(col4);
+                String
+                        col5 =
+                        String.format("e%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col6 =
+                        String.format("f%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col7 =
+                        String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col8 =
+                        String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col9 =
+                        String.format("i%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                return Lists.newArrayList(
+                        new Object[] { id, zid, col4, col5, col6, col7,
+                                col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        DataReader dataReader = new BasicDataReader();
+
+        List<String> columns = Lists.newArrayList("ID", "ZID",
+                "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("COL6");
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + 
schemaBuilder.getDataOptions()
+                        .getTenantId();
+        try (Connection writeConnection = DriverManager
+                .getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Upsert data for validation
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            dataReader.setValidationColumns(rowKeyColumns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT col6 from %s where col4 = 
'%s'",
+                    schemaBuilder.getEntityTenantViewName(), 
outerCol4s.get(1)));
+            
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, 
schemaBuilder);
+        }
+    }
+
+    /**
+     * Ensure/validate that empty columns for the index are still updated even 
when covered columns
+     * are not updated.
+     * @throws Exception
+     */
+    @Test public void testWithSQLUsingIndexAndNoCoveredColsUpdates() throws 
Exception {
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions
+                globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        GlobalViewIndexOptions
+                globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", 
"VARCHAR", "VARCHAR"));
+
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, 
null));
+        
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
 null, null, null));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+                .build();
+
+
+        // Define the test data.
+        final List<String> outerCol4s = Lists.newArrayList();
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String id = String.format("00A0y000%07d", rowIndex);
+                String zid = String.format("00B0y000%07d", rowIndex);
+                String
+                        col4 =
+                        String.format("d%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                // Store the col4 data to be used later in a where clause
+                outerCol4s.add(col4);
+                String
+                        col5 =
+                        String.format("e%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col6 =
+                        String.format("f%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col7 =
+                        String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col8 =
+                        String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col9 =
+                        String.format("i%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                return Lists.newArrayList(
+                        new Object[] { id, zid, col4, col5, col6, col7,
+                                col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        DataReader dataReader = new BasicDataReader();
+
+        List<String> columns = Lists.newArrayList("ID", "ZID",
+                "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+        List<String> nonCoveredColumns = Lists.newArrayList("ID", "ZID",
+                "COL5", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("COL6");
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + 
schemaBuilder.getDataOptions()
+                        .getTenantId();
+        try (Connection writeConnection = DriverManager
+                .getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Upsert data for validation
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            dataReader.setValidationColumns(rowKeyColumns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT col6 from %s where col4 = 
'%s'",
+                    schemaBuilder.getEntityTenantViewName(), 
outerCol4s.get(1)));
+            
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, 
schemaBuilder);
+
+
+            // Now update the above data but not modifying the covered columns.
+            // Ensure/validate that empty columns for the index are still 
updated.
+
+            // Data supplier where covered and included (col4 and col6) 
columns are not updated.
+            DataSupplier dataSupplierForNonCoveredColumns = new DataSupplier() 
{
+
+                @Override public List<Object> getValues(int rowIndex) {
+                    Random rnd = new Random();
+                    String id = String.format("00A0y000%07d", rowIndex);
+                    String zid = String.format("00B0y000%07d", rowIndex);
+                    String
+                            col5 =
+                            String.format("e%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col7 =
+                            String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col8 =
+                            String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col9 =
+                            String.format("i%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                    return Lists.newArrayList(
+                            new Object[] { id, zid, col5, col7,
+                                    col8, col9 });
+                }
+            };
+
+            // Upsert data for validation with non covered columns
+            dataWriter.setDataSupplier(dataSupplierForNonCoveredColumns);
+            dataWriter.setUpsertColumns(nonCoveredColumns);
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            List<String> rowKeyColumns1 = Lists.newArrayList("ID", "COL6");
+            dataReader.setValidationColumns(rowKeyColumns1);
+            dataReader.setRowKeyColumns(rowKeyColumns1);
+            dataReader.setDML(String.format("SELECT id, col6 from %s where 
col4 = '%s'",
+                    schemaBuilder.getEntityTenantViewName(), 
outerCol4s.get(1)));
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, 
schemaBuilder);
+
+        }
+    }
+
+    @Test public void testWithVariousSQLs() throws Exception {
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions
+                globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        GlobalViewIndexOptions
+                globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", 
"VARCHAR", "VARCHAR"));
+
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, 
null));
+        
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
 null, null, null));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+                .build();
+
+
+        // Define the test data.
+        final String groupById = String.format("00A0y000%07d", 0);
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String zid = String.format("00B0y000%07d", rowIndex);
+                String
+                        col4 =
+                        String.format("d%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col5 =
+                        String.format("e%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col6 =
+                        String.format("f%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col7 =
+                        String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col8 =
+                        String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col9 =
+                        String.format("i%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                return Lists.newArrayList(
+                        new Object[] { groupById, zid, col4, col5, col6, col7,
+                                col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        List<String> columns = Lists.newArrayList("ID", "ZID",
+                "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + 
schemaBuilder.getDataOptions()
+                        .getTenantId();
+        try (Connection writeConnection = DriverManager
+                .getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            // Case : count(1) sql
+            DataReader dataReader = new BasicDataReader();
+            dataReader.setValidationColumns(Arrays.asList("num_rows"));
+            dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+            dataReader.setDML(String.format("SELECT count(1) as num_rows from 
%s HAVING count(1) > 0",
+                    schemaBuilder.getEntityTenantViewName()));
+            
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, 
schemaBuilder);
+
+            // Case : group by sql
+            dataReader.setValidationColumns(Arrays.asList("num_rows"));
+            dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+            dataReader.setDML(String.format("SELECT count(1) as num_rows from 
%s GROUP BY ID HAVING count(1) > 0",
+                    schemaBuilder.getEntityTenantViewName(),
+                    groupById));
+
+            
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, 
schemaBuilder);
+        }
+    }
+
+    @Test public void testWithVariousSQLsForMultipleTenants() throws Exception 
{
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions
+                globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        GlobalViewIndexOptions
+                globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", 
"VARCHAR", "VARCHAR"));
+
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, 
null));
+        
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
 null, null, null));
+
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault);
+
+        for (int tenant : Arrays.asList(new Integer[] {1, 2, 3})) {
+            // build schema for tenant
+            schemaBuilder.buildWithNewTenant();
+
+            // Define the test data.
+            final String groupById = String.format("00A0y000%07d", 0);
+            DataSupplier dataSupplier = new DataSupplier() {
+
+                @Override public List<Object> getValues(int rowIndex) {
+                    Random rnd = new Random();
+                    String zid = String.format("00B0y000%07d", rowIndex);
+                    String
+                            col4 =
+                            String.format("d%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col5 =
+                            String.format("e%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col6 =
+                            String.format("f%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col7 =
+                            String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col8 =
+                            String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col9 =
+                            String.format("i%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                    return Lists.newArrayList(
+                            new Object[] { groupById, zid, col4, col5, col6, 
col7,
+                                    col8, col9 });
+                }
+            };
+
+            // Create a test data reader/writer for the above schema.
+            DataWriter dataWriter = new BasicDataWriter();
+            List<String> columns = Lists.newArrayList("ID", "ZID",
+                    "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+            List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+            String
+                    tenantConnectUrl =
+                    getUrl() + ';' + TENANT_ID_ATTRIB + '=' + 
schemaBuilder.getDataOptions()
+                            .getTenantId();
+            try (Connection writeConnection = DriverManager
+                    .getConnection(tenantConnectUrl)) {
+                writeConnection.setAutoCommit(true);
+                dataWriter.setConnection(writeConnection);
+                dataWriter.setDataSupplier(dataSupplier);
+                dataWriter.setUpsertColumns(columns);
+                dataWriter.setRowKeyColumns(rowKeyColumns);
+                
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+                upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+                // Case : count(1) sql
+                DataReader dataReader = new BasicDataReader();
+                dataReader.setValidationColumns(Arrays.asList("num_rows"));
+                dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+                dataReader.setDML(String.format("SELECT count(1) as num_rows 
from %s HAVING count(1) > 0",
+                        schemaBuilder.getEntityTenantViewName()));
+                
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, 
dataReader, schemaBuilder);
+
+                // Case : group by sql
+                dataReader.setValidationColumns(Arrays.asList("num_rows"));
+                dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+                dataReader.setDML(String.format("SELECT count(1) as num_rows 
from %s GROUP BY ID HAVING count(1) > 0",
+                        schemaBuilder.getEntityTenantViewName()));
+
+                
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, 
dataReader, schemaBuilder);
+            }
+        }
+    }
+
+    @Test public void testWithVariousSQLsForMultipleViews() throws Exception {
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", 
"VARCHAR", "VARCHAR"));
+
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, 
null));
+        
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
 null, null, null));
+
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault);
+
+        for (int view : Arrays.asList(new Integer[] {1, 2, 3})) {
+            // build schema for new view
+            schemaBuilder.buildNewView();
+
+            // Define the test data.
+            DataSupplier dataSupplier = new DataSupplier() {
+
+                @Override public List<Object> getValues(int rowIndex) {
+                    Random rnd = new Random();
+                    String zid = String.format("00B0y000%07d", rowIndex);
+                    String
+                            col7 =
+                            String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col8 =
+                            String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col9 =
+                            String.format("i%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                    return Lists.newArrayList(
+                            new Object[] { zid, col7, col8, col9 });
+                }
+            };
+
+            // Create a test data reader/writer for the above schema.
+            DataWriter dataWriter = new BasicDataWriter();
+            List<String> columns = Lists.newArrayList("ZID",
+                    "COL7", "COL8", "COL9");
+            List<String> rowKeyColumns = Lists.newArrayList("ZID");
+            String
+                    tenantConnectUrl =
+                    getUrl() + ';' + TENANT_ID_ATTRIB + '=' + 
schemaBuilder.getDataOptions()
+                            .getTenantId();
+            try (Connection writeConnection = DriverManager
+                    .getConnection(tenantConnectUrl)) {
+                writeConnection.setAutoCommit(true);
+                dataWriter.setConnection(writeConnection);
+                dataWriter.setDataSupplier(dataSupplier);
+                dataWriter.setUpsertColumns(columns);
+                dataWriter.setRowKeyColumns(rowKeyColumns);
+                
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+                upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+                // Case : count(1) sql
+                DataReader dataReader = new BasicDataReader();
+                dataReader.setValidationColumns(Arrays.asList("num_rows"));
+                dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+                dataReader.setDML(String.format("SELECT count(1) as num_rows 
from %s HAVING count(1) > 0",
+                        schemaBuilder.getEntityTenantViewName()));
+                
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, 
dataReader, schemaBuilder);
+
+                // Case : group by sql
+                dataReader.setValidationColumns(Arrays.asList("num_rows"));
+                dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+                dataReader.setDML(String.format("SELECT count(1) as num_rows 
from %s GROUP BY ZID HAVING count(1) > 0",
+                        schemaBuilder.getEntityTenantViewName()));
+
+                
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, 
dataReader, schemaBuilder);
+            }
+        }
+    }
 
+    @Test
+    public void testWithTenantViewAndGlobalViewAndVariousOptions() throws 
Exception {
+        long viewTTL = 10000;
+
+        // Define the test schema
+        TableOptions tableOptions = TableOptions.withDefaults();
+
+        GlobalViewOptions
+                globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        GlobalViewIndexOptions
+                globalViewIndexOptions =
+                GlobalViewIndexOptions.withDefaults();
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", 
"VARCHAR", "VARCHAR"));
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        TenantViewIndexOptions
+                tenantViewIndexOptions =
+                TenantViewIndexOptions.withDefaults();
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, 
null));
+        
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
 null, null, null));
+
+        for (String additionalProps : Lists
+                .newArrayList("COLUMN_ENCODED_BYTES=0", 
"DEFAULT_COLUMN_FAMILY='0'")) {
+
+            StringBuilder withTableProps = new StringBuilder();
+            
withTableProps.append("MULTI_TENANT=true,").append(additionalProps);
+
+            for (boolean isGlobalViewLocal : Lists.newArrayList(true, false)) {
+                for (boolean isTenantViewLocal : Lists.newArrayList(true, 
false)) {
+
+                    tableOptions.setTableProps(withTableProps.toString());
+                    globalViewIndexOptions.setLocal(isGlobalViewLocal);
+                    tenantViewIndexOptions.setLocal(isTenantViewLocal);
+                    OtherOptions otherOptions = 
testCaseWhenAllCFMatchAndAllDefault;
+
+                    final SchemaBuilder schemaBuilder = new 
SchemaBuilder(getUrl());
+                    schemaBuilder
+                            .withTableOptions(tableOptions)
+                            .withGlobalViewOptions(globalViewOptions)
+                            .withGlobalViewIndexOptions(globalViewIndexOptions)
+                            .withTenantViewOptions(tenantViewOptions)
+                            .withTenantViewIndexOptions(tenantViewIndexOptions)
+                            
.withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+                            .buildWithNewTenant();
+
+                    // Define the test data.
+                    DataSupplier dataSupplier = new DataSupplier() {
+
+                        @Override public List<Object> getValues(int rowIndex) {

Review comment:
       Would like to keep the anonymous data supplier classes for readability 
and context. Many of the classes have different sets of columns. Would agree 
there are some duplicates. But let me know if you feel strongly.

##########
File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
##########
@@ -391,7 +402,1390 @@ public void testViewTTLWithAlterView() throws Exception {
         // Since the VIEW_TTL property values are not being overriden, we 
expect the TTL value to be different from the global view.
         assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, 
tenantViewName, PTableType.VIEW.getSerializedValue(), 1000);
         assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, 
indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 1000);
+    }
+
+    @Test
+    public void testViewTTLForLevelTwoViewWithNoIndexes() throws Exception {
+        long startTime = System.currentTimeMillis();
+
+        // Define the test schema.
+        // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK 
=> (ORG_ID, KP)
+        // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => 
(ID)
+        // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => 
(ZID)
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true");
+
+        SchemaBuilder.GlobalViewOptions
+                globalViewOptions = 
SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps("VIEW_TTL=300000");
+
+        TenantViewOptions tenantViewWithOverrideOptions = 
TenantViewOptions.withDefaults();
+        tenantViewWithOverrideOptions.setTableProps("VIEW_TTL=10000");
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withTenantViewOptions(tenantViewWithOverrideOptions)
+                .buildWithNewTenant();
+
+        String tenantId = schemaBuilder.getDataOptions().getTenantId();
+        String schemaName = 
stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        String globalViewName = 
stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName()));
+        String tenantViewName = 
stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+
+        // Expected 2 rows - one for GlobalView, one for TenantView  each.
+        // Since the VIEW_TTL property values are being set, we expect the 
view header columns to show up in regular scans too.
+        
assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(),
 startTime, false, 2);
+        assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName, 
PTableType.VIEW.getSerializedValue(), 300000);
+        // Since the VIEW_TTL property values are not being overriden, we 
expect the TTL value to be different from the global view.
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, 
tenantViewName, PTableType.VIEW.getSerializedValue(), 10000);
+
+        // Without override
+        startTime = System.currentTimeMillis();
+
+        TenantViewOptions tenantViewWithoutOverrideOptions = 
TenantViewOptions.withDefaults();
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withTenantViewOptions(tenantViewWithoutOverrideOptions)
+                .buildWithNewTenant();
+
+        tenantId = schemaBuilder.getDataOptions().getTenantId();
+        schemaName = 
stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+        globalViewName = 
stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName()));
+        tenantViewName = 
stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName()));
+
+        // Expected 1 rows - one for TenantView each.
+        // Since the VIEW_TTL property values are being set, we expect the 
view header columns to show up in regular scans too.
+        
assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(),
 startTime, false, 1);
+        assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName, 
PTableType.VIEW.getSerializedValue(), 300000);
+        // Since the VIEW_TTL property values are not being overriden, we 
expect the TTL value to be same as the global view.
+        assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, 
tenantViewName, PTableType.VIEW.getSerializedValue(), 300000);
+    }
+
+    @Test
+    public void testWithTenantViewAndNoGlobalView() throws Exception {
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults();
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .build();
+
+        // Define the test data.
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String zid = String.format("00A0y000%07d", rowIndex);
+                String col7 = String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String col8 = String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String col9 = String.format("i%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                return Lists.newArrayList(new Object[] { zid, col7, col8, col9 
});
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        DataReader dataReader = new BasicDataReader();
+
+        List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8", 
"COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ZID");
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + 
schemaBuilder.getDataOptions()
+                        .getTenantId();
+        try (Connection writeConnection = DriverManager
+                .getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            dataReader.setValidationColumns(columns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT %s from %s",
+                    Joiner.on(",").join(columns),
+                    schemaBuilder.getEntityTenantViewName()));
+            
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            upsertDataAndRunValidations(viewTTL, DEFAULT_NUM_ROWS, dataWriter,
+                    dataReader, schemaBuilder);
+        }
+    }
+
+    @Test public void testWithSQLUsingIndexWithCoveredColsUpdates() throws 
Exception {
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions
+                globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        GlobalViewIndexOptions
+                globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", 
"VARCHAR", "VARCHAR"));
+
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, 
null));
+        
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
 null, null, null));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+                .build();
+
+
+        // Define the test data.
+        final List<String> outerCol4s = Lists.newArrayList();
+        DataSupplier dataSupplier = new DataSupplier() {
+            String col4ForWhereClause;
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String id = String.format("00A0y000%07d", rowIndex);
+                String zid = String.format("00B0y000%07d", rowIndex);
+                String
+                        col4 =
+                        String.format("d%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                // Store the col4 data to be used later in a where clause
+                outerCol4s.add(col4);
+                String
+                        col5 =
+                        String.format("e%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col6 =
+                        String.format("f%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col7 =
+                        String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col8 =
+                        String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col9 =
+                        String.format("i%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                return Lists.newArrayList(
+                        new Object[] { id, zid, col4, col5, col6, col7,
+                                col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        DataReader dataReader = new BasicDataReader();
+
+        List<String> columns = Lists.newArrayList("ID", "ZID",
+                "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("COL6");
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + 
schemaBuilder.getDataOptions()
+                        .getTenantId();
+        try (Connection writeConnection = DriverManager
+                .getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Upsert data for validation
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            dataReader.setValidationColumns(rowKeyColumns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT col6 from %s where col4 = 
'%s'",
+                    schemaBuilder.getEntityTenantViewName(), 
outerCol4s.get(1)));
+            
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, 
schemaBuilder);
+        }
+    }
+
+    /**
+     * Ensure/validate that empty columns for the index are still updated even 
when covered columns
+     * are not updated.
+     * @throws Exception
+     */
+    @Test public void testWithSQLUsingIndexAndNoCoveredColsUpdates() throws 
Exception {
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions
+                globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        GlobalViewIndexOptions
+                globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", 
"VARCHAR", "VARCHAR"));
+
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, 
null));
+        
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
 null, null, null));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+                .build();
+
+
+        // Define the test data.
+        final List<String> outerCol4s = Lists.newArrayList();
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String id = String.format("00A0y000%07d", rowIndex);
+                String zid = String.format("00B0y000%07d", rowIndex);
+                String
+                        col4 =
+                        String.format("d%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                // Store the col4 data to be used later in a where clause
+                outerCol4s.add(col4);
+                String
+                        col5 =
+                        String.format("e%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col6 =
+                        String.format("f%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col7 =
+                        String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col8 =
+                        String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col9 =
+                        String.format("i%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                return Lists.newArrayList(
+                        new Object[] { id, zid, col4, col5, col6, col7,
+                                col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        DataReader dataReader = new BasicDataReader();
+
+        List<String> columns = Lists.newArrayList("ID", "ZID",
+                "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+        List<String> nonCoveredColumns = Lists.newArrayList("ID", "ZID",
+                "COL5", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("COL6");
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + 
schemaBuilder.getDataOptions()
+                        .getTenantId();
+        try (Connection writeConnection = DriverManager
+                .getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Upsert data for validation
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            dataReader.setValidationColumns(rowKeyColumns);
+            dataReader.setRowKeyColumns(rowKeyColumns);
+            dataReader.setDML(String.format("SELECT col6 from %s where col4 = 
'%s'",
+                    schemaBuilder.getEntityTenantViewName(), 
outerCol4s.get(1)));
+            
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, 
schemaBuilder);
+
+
+            // Now update the above data but not modifying the covered columns.
+            // Ensure/validate that empty columns for the index are still 
updated.
+
+            // Data supplier where covered and included (col4 and col6) 
columns are not updated.
+            DataSupplier dataSupplierForNonCoveredColumns = new DataSupplier() 
{
+
+                @Override public List<Object> getValues(int rowIndex) {
+                    Random rnd = new Random();
+                    String id = String.format("00A0y000%07d", rowIndex);
+                    String zid = String.format("00B0y000%07d", rowIndex);
+                    String
+                            col5 =
+                            String.format("e%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col7 =
+                            String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col8 =
+                            String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col9 =
+                            String.format("i%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                    return Lists.newArrayList(
+                            new Object[] { id, zid, col5, col7,
+                                    col8, col9 });
+                }
+            };
+
+            // Upsert data for validation with non covered columns
+            dataWriter.setDataSupplier(dataSupplierForNonCoveredColumns);
+            dataWriter.setUpsertColumns(nonCoveredColumns);
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            List<String> rowKeyColumns1 = Lists.newArrayList("ID", "COL6");
+            dataReader.setValidationColumns(rowKeyColumns1);
+            dataReader.setRowKeyColumns(rowKeyColumns1);
+            dataReader.setDML(String.format("SELECT id, col6 from %s where 
col4 = '%s'",
+                    schemaBuilder.getEntityTenantViewName(), 
outerCol4s.get(1)));
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, 
schemaBuilder);
+
+        }
+    }
+
+    @Test public void testWithVariousSQLs() throws Exception {
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions
+                globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        GlobalViewIndexOptions
+                globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", 
"VARCHAR", "VARCHAR"));
+
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, 
null));
+        
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
 null, null, null));
+
+        // Define the test schema.
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+                .build();
+
+
+        // Define the test data.
+        final String groupById = String.format("00A0y000%07d", 0);
+        DataSupplier dataSupplier = new DataSupplier() {
+
+            @Override public List<Object> getValues(int rowIndex) {
+                Random rnd = new Random();
+                String zid = String.format("00B0y000%07d", rowIndex);
+                String
+                        col4 =
+                        String.format("d%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col5 =
+                        String.format("e%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col6 =
+                        String.format("f%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col7 =
+                        String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col8 =
+                        String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                String
+                        col9 =
+                        String.format("i%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                return Lists.newArrayList(
+                        new Object[] { groupById, zid, col4, col5, col6, col7,
+                                col8, col9 });
+            }
+        };
+
+        // Create a test data reader/writer for the above schema.
+        DataWriter dataWriter = new BasicDataWriter();
+        List<String> columns = Lists.newArrayList("ID", "ZID",
+                "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+        List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+        String
+                tenantConnectUrl =
+                getUrl() + ';' + TENANT_ID_ATTRIB + '=' + 
schemaBuilder.getDataOptions()
+                        .getTenantId();
+        try (Connection writeConnection = DriverManager
+                .getConnection(tenantConnectUrl)) {
+            writeConnection.setAutoCommit(true);
+            dataWriter.setConnection(writeConnection);
+            dataWriter.setDataSupplier(dataSupplier);
+            dataWriter.setUpsertColumns(columns);
+            dataWriter.setRowKeyColumns(rowKeyColumns);
+            
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+            upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+            // Case : count(1) sql
+            DataReader dataReader = new BasicDataReader();
+            dataReader.setValidationColumns(Arrays.asList("num_rows"));
+            dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+            dataReader.setDML(String.format("SELECT count(1) as num_rows from 
%s HAVING count(1) > 0",
+                    schemaBuilder.getEntityTenantViewName()));
+            
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, 
schemaBuilder);
+
+            // Case : group by sql
+            dataReader.setValidationColumns(Arrays.asList("num_rows"));
+            dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+            dataReader.setDML(String.format("SELECT count(1) as num_rows from 
%s GROUP BY ID HAVING count(1) > 0",
+                    schemaBuilder.getEntityTenantViewName(),
+                    groupById));
+
+            
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+            // Validate data before and after ttl expiration.
+            validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, 
schemaBuilder);
+        }
+    }
+
+    @Test public void testWithVariousSQLsForMultipleTenants() throws Exception 
{
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        GlobalViewOptions
+                globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        GlobalViewIndexOptions
+                globalViewIndexOptions =
+                SchemaBuilder.GlobalViewIndexOptions.withDefaults();
+        globalViewIndexOptions.setLocal(false);
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", 
"VARCHAR", "VARCHAR"));
+
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, 
null));
+        
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
 null, null, null));
+
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withGlobalViewOptions(globalViewOptions)
+                .withGlobalViewIndexOptions(globalViewIndexOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault);
+
+        for (int tenant : Arrays.asList(new Integer[] {1, 2, 3})) {
+            // build schema for tenant
+            schemaBuilder.buildWithNewTenant();
+
+            // Define the test data.
+            final String groupById = String.format("00A0y000%07d", 0);
+            DataSupplier dataSupplier = new DataSupplier() {
+
+                @Override public List<Object> getValues(int rowIndex) {
+                    Random rnd = new Random();
+                    String zid = String.format("00B0y000%07d", rowIndex);
+                    String
+                            col4 =
+                            String.format("d%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col5 =
+                            String.format("e%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col6 =
+                            String.format("f%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col7 =
+                            String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col8 =
+                            String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col9 =
+                            String.format("i%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                    return Lists.newArrayList(
+                            new Object[] { groupById, zid, col4, col5, col6, 
col7,
+                                    col8, col9 });
+                }
+            };
+
+            // Create a test data reader/writer for the above schema.
+            DataWriter dataWriter = new BasicDataWriter();
+            List<String> columns = Lists.newArrayList("ID", "ZID",
+                    "COL4", "COL5", "COL6", "COL7", "COL8", "COL9");
+            List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID");
+            String
+                    tenantConnectUrl =
+                    getUrl() + ';' + TENANT_ID_ATTRIB + '=' + 
schemaBuilder.getDataOptions()
+                            .getTenantId();
+            try (Connection writeConnection = DriverManager
+                    .getConnection(tenantConnectUrl)) {
+                writeConnection.setAutoCommit(true);
+                dataWriter.setConnection(writeConnection);
+                dataWriter.setDataSupplier(dataSupplier);
+                dataWriter.setUpsertColumns(columns);
+                dataWriter.setRowKeyColumns(rowKeyColumns);
+                
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+                upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+                // Case : count(1) sql
+                DataReader dataReader = new BasicDataReader();
+                dataReader.setValidationColumns(Arrays.asList("num_rows"));
+                dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+                dataReader.setDML(String.format("SELECT count(1) as num_rows 
from %s HAVING count(1) > 0",
+                        schemaBuilder.getEntityTenantViewName()));
+                
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, 
dataReader, schemaBuilder);
+
+                // Case : group by sql
+                dataReader.setValidationColumns(Arrays.asList("num_rows"));
+                dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+                dataReader.setDML(String.format("SELECT count(1) as num_rows 
from %s GROUP BY ID HAVING count(1) > 0",
+                        schemaBuilder.getEntityTenantViewName()));
+
+                
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, 
dataReader, schemaBuilder);
+            }
+        }
+    }
+
+    @Test public void testWithVariousSQLsForMultipleViews() throws Exception {
+
+        long viewTTL = 10000;
+        TableOptions tableOptions = TableOptions.withDefaults();
+        tableOptions.getTableColumns().clear();
+        tableOptions.getTableColumnTypes().clear();
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", 
"VARCHAR", "VARCHAR"));
+
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, 
null));
+        
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
 null, null, null));
+
+        final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl());
+        schemaBuilder
+                .withTableOptions(tableOptions)
+                .withTenantViewOptions(tenantViewOptions)
+                .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault);
+
+        for (int view : Arrays.asList(new Integer[] {1, 2, 3})) {
+            // build schema for new view
+            schemaBuilder.buildNewView();
+
+            // Define the test data.
+            DataSupplier dataSupplier = new DataSupplier() {
+
+                @Override public List<Object> getValues(int rowIndex) {
+                    Random rnd = new Random();
+                    String zid = String.format("00B0y000%07d", rowIndex);
+                    String
+                            col7 =
+                            String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col8 =
+                            String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                    String
+                            col9 =
+                            String.format("i%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+
+                    return Lists.newArrayList(
+                            new Object[] { zid, col7, col8, col9 });
+                }
+            };
+
+            // Create a test data reader/writer for the above schema.
+            DataWriter dataWriter = new BasicDataWriter();
+            List<String> columns = Lists.newArrayList("ZID",
+                    "COL7", "COL8", "COL9");
+            List<String> rowKeyColumns = Lists.newArrayList("ZID");
+            String
+                    tenantConnectUrl =
+                    getUrl() + ';' + TENANT_ID_ATTRIB + '=' + 
schemaBuilder.getDataOptions()
+                            .getTenantId();
+            try (Connection writeConnection = DriverManager
+                    .getConnection(tenantConnectUrl)) {
+                writeConnection.setAutoCommit(true);
+                dataWriter.setConnection(writeConnection);
+                dataWriter.setDataSupplier(dataSupplier);
+                dataWriter.setUpsertColumns(columns);
+                dataWriter.setRowKeyColumns(rowKeyColumns);
+                
dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+                upsertData(dataWriter, DEFAULT_NUM_ROWS);
+
+                // Case : count(1) sql
+                DataReader dataReader = new BasicDataReader();
+                dataReader.setValidationColumns(Arrays.asList("num_rows"));
+                dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+                dataReader.setDML(String.format("SELECT count(1) as num_rows 
from %s HAVING count(1) > 0",
+                        schemaBuilder.getEntityTenantViewName()));
+                
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, 
dataReader, schemaBuilder);
+
+                // Case : group by sql
+                dataReader.setValidationColumns(Arrays.asList("num_rows"));
+                dataReader.setRowKeyColumns(Arrays.asList("num_rows"));
+                dataReader.setDML(String.format("SELECT count(1) as num_rows 
from %s GROUP BY ZID HAVING count(1) > 0",
+                        schemaBuilder.getEntityTenantViewName()));
+
+                
dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName());
+
+                // Validate data before and after ttl expiration.
+                validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, 
dataReader, schemaBuilder);
+            }
+        }
+    }
 
+    @Test
+    public void testWithTenantViewAndGlobalViewAndVariousOptions() throws 
Exception {
+        long viewTTL = 10000;
+
+        // Define the test schema
+        TableOptions tableOptions = TableOptions.withDefaults();
+
+        GlobalViewOptions
+                globalViewOptions =
+                SchemaBuilder.GlobalViewOptions.withDefaults();
+        globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        GlobalViewIndexOptions
+                globalViewIndexOptions =
+                GlobalViewIndexOptions.withDefaults();
+
+        TenantViewOptions tenantViewOptions = new TenantViewOptions();
+        tenantViewOptions
+                .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9"));
+        tenantViewOptions
+                .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", 
"VARCHAR", "VARCHAR"));
+        tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL));
+
+        TenantViewIndexOptions
+                tenantViewIndexOptions =
+                TenantViewIndexOptions.withDefaults();
+
+        OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions();
+        
testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault");
+        testCaseWhenAllCFMatchAndAllDefault
+                .setTableCFs(Lists.newArrayList((String) null, null, null));
+        testCaseWhenAllCFMatchAndAllDefault
+                .setGlobalViewCFs(Lists.newArrayList((String) null, null, 
null));
+        
testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null,
 null, null, null));
+
+        for (String additionalProps : Lists
+                .newArrayList("COLUMN_ENCODED_BYTES=0", 
"DEFAULT_COLUMN_FAMILY='0'")) {
+
+            StringBuilder withTableProps = new StringBuilder();
+            
withTableProps.append("MULTI_TENANT=true,").append(additionalProps);
+
+            for (boolean isGlobalViewLocal : Lists.newArrayList(true, false)) {
+                for (boolean isTenantViewLocal : Lists.newArrayList(true, 
false)) {
+
+                    tableOptions.setTableProps(withTableProps.toString());
+                    globalViewIndexOptions.setLocal(isGlobalViewLocal);
+                    tenantViewIndexOptions.setLocal(isTenantViewLocal);
+                    OtherOptions otherOptions = 
testCaseWhenAllCFMatchAndAllDefault;
+
+                    final SchemaBuilder schemaBuilder = new 
SchemaBuilder(getUrl());
+                    schemaBuilder
+                            .withTableOptions(tableOptions)
+                            .withGlobalViewOptions(globalViewOptions)
+                            .withGlobalViewIndexOptions(globalViewIndexOptions)
+                            .withTenantViewOptions(tenantViewOptions)
+                            .withTenantViewIndexOptions(tenantViewIndexOptions)
+                            
.withOtherOptions(testCaseWhenAllCFMatchAndAllDefault)
+                            .buildWithNewTenant();
+
+                    // Define the test data.
+                    DataSupplier dataSupplier = new DataSupplier() {
+
+                        @Override public List<Object> getValues(int rowIndex) {
+                            Random rnd = new Random();
+                            String id = String.format("00A0y000%07d", 
rowIndex);
+                            String zid = String.format("00B0y000%07d", 
rowIndex);
+                            String
+                                    col1 =
+                                    String.format("a%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                            String
+                                    col2 =
+                                    String.format("b%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                            String
+                                    col3 =
+                                    String.format("c%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                            String
+                                    col4 =
+                                    String.format("d%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                            String
+                                    col5 =
+                                    String.format("e%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                            String
+                                    col6 =
+                                    String.format("f%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                            String
+                                    col7 =
+                                    String.format("g%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                            String
+                                    col8 =
+                                    String.format("h%05d", rowIndex + 
rnd.nextInt(MAX_ROWS));
+                            String

Review comment:
       Not sure what happened, was using the Phoenix template, but have 
reformatted it. Seems better!!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to