This is an automated email from the ASF dual-hosted git repository. yanxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 46779eb PHOENIX-6192 : Use tenant connection to resolve tenant views in syncUpdateCacheFreqAllIndexes() 46779eb is described below commit 46779eb7e82880bc58b56972e744912fde2346a3 Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Sat Oct 24 14:49:55 2020 +0530 PHOENIX-6192 : Use tenant connection to resolve tenant views in syncUpdateCacheFreqAllIndexes() Signed-off-by: Xinyi Yan <yanxi...@apache.org> --- .../phoenix/end2end/SyncUpdateCacheFreqIT.java | 239 +++++++++++++++++++++ .../java/org/apache/phoenix/util/UpgradeUtil.java | 64 ++++-- 2 files changed, 287 insertions(+), 16 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SyncUpdateCacheFreqIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SyncUpdateCacheFreqIT.java new file mode 100644 index 0000000..a4d4ff2 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SyncUpdateCacheFreqIT.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.UpgradeUtil; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; + +import static org.apache.phoenix.util.UpgradeUtil.UPSERT_UPDATE_CACHE_FREQUENCY; +import static org.junit.Assert.assertEquals; + + +public class SyncUpdateCacheFreqIT extends BaseTest { + + private static final String SCHEMA_NAME = "SCHEMA2"; + private static final String TABLE_NAME = generateUniqueName(); + private static String tenant_name; + private static final String GLOBAL_INDEX = "GLOBAL_INDEX"; + private static final String LOCAL_INDEX = "LOCAL_INDEX"; + private static final String VIEW1_NAME = "VIEW1"; + private static final String VIEW1_INDEX1_NAME = "VIEW1_INDEX1"; + private static final String VIEW1_INDEX2_NAME = "VIEW1_INDEX2"; + private static final String VIEW2_NAME = "VIEW2"; + private static final String VIEW2_INDEX1_NAME = "VIEW2_INDEX1"; + private static final String VIEW2_INDEX2_NAME = "VIEW2_INDEX2"; + private static final String VIEW_INDEX_COL = "v2"; + private static final List<String> INDEXS_TO_UPDATE_CACHE_FREQ = + ImmutableList.of(VIEW1_INDEX1_NAME, VIEW2_INDEX1_NAME, VIEW1_INDEX2_NAME, + VIEW2_INDEX2_NAME); + private static final Map<String, List<String>> TABLE_TO_INDEX = + ImmutableMap.of(TABLE_NAME, ImmutableList.of(GLOBAL_INDEX, LOCAL_INDEX), + VIEW1_NAME, ImmutableList.of(VIEW1_INDEX1_NAME, VIEW1_INDEX2_NAME), + VIEW2_NAME, ImmutableList.of(VIEW2_INDEX1_NAME, VIEW2_INDEX2_NAME)); + private static final Set<String> GLOBAL_TABLES = + ImmutableSet.of(GLOBAL_INDEX, LOCAL_INDEX, TABLE_NAME); + + private static final int TABLE_CACHE_FREQ = 5000; + private static final int VIEW_CACHE_FREQ = 7000; + private static final Random RANDOM_INT = new Random(); + + private static final String CREATE_GLOBAL_INDEX = "CREATE INDEX %s ON %s(%s)"; + private static final String CREATE_LOCAL_INDEX = "CREATE LOCAL INDEX %s ON %s(%s)"; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + createBaseTable(SCHEMA_NAME, TABLE_NAME, true, TABLE_CACHE_FREQ); + createIndex(getConnection(), SCHEMA_NAME, GLOBAL_INDEX, TABLE_NAME, + VIEW_INDEX_COL, false); + createIndex(getConnection(), SCHEMA_NAME, LOCAL_INDEX, TABLE_NAME, + VIEW_INDEX_COL, true); + } + + @Test + public void testSyncCacheFreqWithTenantView() throws Exception { + for (int i = 1; i <= 3; i++) { + // verify tenant view resolution with cache freq sync with different + // tenants + tenant_name = "TENANT_" + i; + try (Connection conn = getTenantConnection(tenant_name)) { + createView(conn, SCHEMA_NAME, VIEW1_NAME, TABLE_NAME); + createIndex(conn, SCHEMA_NAME, VIEW1_INDEX1_NAME, VIEW1_NAME, VIEW_INDEX_COL, + false); + createIndex(conn, SCHEMA_NAME, VIEW1_INDEX2_NAME, VIEW1_NAME, VIEW_INDEX_COL, + false); + createView(conn, SCHEMA_NAME, VIEW2_NAME, VIEW1_NAME); + createIndex(conn, SCHEMA_NAME, VIEW2_INDEX1_NAME, VIEW2_NAME, VIEW_INDEX_COL, + false); + createIndex(conn, SCHEMA_NAME, VIEW2_INDEX2_NAME, VIEW2_NAME, VIEW_INDEX_COL, + false); + } + + try (Connection conn = getConnection()) { + PreparedStatement stmt = + conn.prepareStatement(UPSERT_UPDATE_CACHE_FREQUENCY); + + Map<String, Long> updatedIndexFreqMap = new HashMap<>(); + // use random numbers to update frequencies of all indexes + for (String index : INDEXS_TO_UPDATE_CACHE_FREQ) { + long updatedCacheFreq = RANDOM_INT.nextInt(4000); + updatedIndexFreqMap.put(index, updatedCacheFreq); + updateCacheFreq(index, updatedCacheFreq, stmt); + } + stmt.executeBatch(); + conn.commit(); + + // clear the server-side cache to get the latest built PTables + conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache(); + + // assert that new updated cache frequencies are present + // hence, index frequencies are different from parent table/view cache frequencies + for (String table : updatedIndexFreqMap.keySet()) { + assertTableFrequencies(conn, table, + updatedIndexFreqMap.get(table)); + } + + // clear the server-side cache to get the latest built PTables + conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache(); + PhoenixConnection pcon = conn.unwrap(PhoenixConnection.class); + pcon.setRunningUpgrade(true); + + UpgradeUtil.syncUpdateCacheFreqAllIndexes(pcon, + PhoenixRuntime.getTableNoCache(conn, + SchemaUtil.getTableName(SCHEMA_NAME, TABLE_NAME))); + + // assert that index frequencies are in sync with table/view cache frequencies + for (String tableOrView : TABLE_TO_INDEX.keySet()) { + final long expectedFreqForTableAndIndex; + if (tableOrView.equals(TABLE_NAME)) { + expectedFreqForTableAndIndex = TABLE_CACHE_FREQ; + } else { + expectedFreqForTableAndIndex = VIEW_CACHE_FREQ; + } + assertTableFrequencies(conn, tableOrView, + expectedFreqForTableAndIndex); + for (String index : TABLE_TO_INDEX.get(tableOrView)) { + assertTableFrequencies(conn, index, expectedFreqForTableAndIndex); + } + } + } + } + } + + private void updateCacheFreq(String tableName, + long freq, PreparedStatement stmt) throws SQLException { + stmt.setString(1, tenant_name); + stmt.setString(2, SCHEMA_NAME); + stmt.setString(3, tableName); + stmt.setLong(4, freq); + stmt.addBatch(); + } + + private void assertTableFrequencies(Connection conn, + String tableName, long expectedCacheFreq) throws SQLException { + conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache(); + ResultSet rs; + if (GLOBAL_TABLES.contains(tableName)) { + rs = conn.createStatement().executeQuery(String.format( + "SELECT UPDATE_CACHE_FREQUENCY FROM SYSTEM.CATALOG WHERE " + + "TABLE_NAME='%s'", tableName)); + } else { + rs = conn.createStatement().executeQuery(String.format( + "SELECT UPDATE_CACHE_FREQUENCY FROM SYSTEM.CATALOG WHERE " + + "TABLE_NAME='%s' AND TENANT_ID='%s'", + tableName, tenant_name)); + } + rs.next(); + long cacheFreq = rs.getLong(1); + assertEquals("Cache Freq for " + tableName + " not matching. actual: " + + cacheFreq + " , expected: " + expectedCacheFreq, + expectedCacheFreq, cacheFreq); + } + + private static void createBaseTable(String schemaName, String tableName, + boolean multiTenant, int cacheFre) throws SQLException { + Connection conn = getConnection(); + String ddl = + "CREATE TABLE " + SchemaUtil.getTableName(schemaName, tableName) + + " (t_id VARCHAR NOT NULL,\n" + "k1 VARCHAR NOT NULL,\n" + + "k2 INTEGER,\n" + "v1 VARCHAR,\n" + VIEW_INDEX_COL + + " INTEGER,\n" + "CONSTRAINT pk PRIMARY KEY (t_id, k1))\n"; + String ddlOptions = multiTenant ? "MULTI_TENANT=true" : ""; + ddlOptions = ddlOptions + (ddlOptions.isEmpty() ? "" : ",") + + "UPDATE_CACHE_FREQUENCY=" + cacheFre; + conn.createStatement().execute(ddl + ddlOptions); + conn.close(); + } + + private static void createIndex(Connection conn, String schemaName, + String indexName, String tableName, String indexColumn, boolean isLocal) + throws SQLException { + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + conn.createStatement().execute(String + .format(isLocal ? CREATE_LOCAL_INDEX : CREATE_GLOBAL_INDEX, + indexName, fullTableName, indexColumn)); + conn.commit(); + } + + private static void createView(Connection conn, String schemaName, + String viewName, String baseTableName) + throws SQLException { + String fullViewName = SchemaUtil.getTableName(schemaName, viewName); + String fullTableName = SchemaUtil.getTableName(schemaName, baseTableName); + conn.createStatement().execute(String.format( + "CREATE VIEW %s AS SELECT * FROM %s UPDATE_CACHE_FREQUENCY=%s", + fullViewName, fullTableName, VIEW_CACHE_FREQ)); + conn.commit(); + } + + private static Connection getTenantConnection(String tenant) + throws SQLException { + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenant); + return DriverManager.getConnection(getUrl(), props); + } + + private static Connection getConnection() throws SQLException { + Properties props = new Properties(); + return DriverManager.getConnection(getUrl(), props); + } + +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java index 9d67fda..3bd5e53 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java @@ -75,6 +75,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeoutException; @@ -82,6 +83,7 @@ import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; import org.apache.phoenix.thirdparty.com.google.common.base.Strings; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -1329,11 +1331,12 @@ public class UpgradeUtil { } private static void syncUpdateCacheFreqForIndexesOfTable(PTable baseTable, - PreparedStatement stmt) throws SQLException { + PreparedStatement stmt, String tenantId) throws SQLException { for (PTable index : baseTable.getIndexes()) { if (index.getUpdateCacheFrequency() == baseTable.getUpdateCacheFrequency()) { continue; } + stmt.setString(1, tenantId); stmt.setString(2, index.getSchemaName().getString()); stmt.setString(3, index.getTableName().getString()); stmt.setLong(4, baseTable.getUpdateCacheFrequency()); @@ -1359,9 +1362,9 @@ public class UpgradeUtil { newConn.getTenantId().getBytes() : null; PreparedStatement stmt = - newConn.prepareStatement(UPSERT_UPDATE_CACHE_FREQUENCY); - stmt.setString(1, Bytes.toString(tenantId)); - syncUpdateCacheFreqForIndexesOfTable(table, stmt); + newConn.prepareStatement(UPSERT_UPDATE_CACHE_FREQUENCY); + syncUpdateCacheFreqForIndexesOfTable(table, stmt, + Bytes.toString(tenantId)); TableViewFinderResult childViewsResult = new TableViewFinderResult(); for (int i=0; i<2; i++) { @@ -1375,18 +1378,9 @@ public class UpgradeUtil { LinkType.CHILD_TABLE, childViewsResult); // Iterate over the chain of child views - for (TableInfo viewInfo: childViewsResult.getLinks()) { - PTable view; - String viewName = SchemaUtil.getTableName(viewInfo.getSchemaName(), - viewInfo.getTableName()); - try { - view = PhoenixRuntime.getTable(newConn, viewName); - } catch (TableNotFoundException e) { - // Ignore - LOGGER.error("Error getting PTable for view: " + viewInfo, e); - continue; - } - syncUpdateCacheFreqForIndexesOfTable(view, stmt); + for (TableInfo tableInfo : childViewsResult.getLinks()) { + getViewAndSyncCacheFreqForIndexes(newConn, stmt, + tableInfo); } break; } catch (TableNotFoundException ex) { @@ -1402,6 +1396,44 @@ public class UpgradeUtil { } } + private static void getViewAndSyncCacheFreqForIndexes( + final PhoenixConnection newConn, + final PreparedStatement stmt, final TableInfo tableInfo) + throws SQLException { + final String viewName = SchemaUtil.getTableName( + tableInfo.getSchemaName(), tableInfo.getTableName()); + final String viewTenantId = Bytes.toString(tableInfo.getTenantId()); + final Optional<PTable> view; + if (StringUtils.isNotEmpty(viewTenantId)) { + Properties props = new Properties(newConn.getClientInfo()); + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, viewTenantId); + // use tenant connection to resolve tenant views + try (PhoenixConnection tenantConn = + new PhoenixConnection(newConn, props)) { + view = resolveView(viewName, tenantConn); + } + } else { + view = resolveView(viewName, newConn); + } + if (view.isPresent()) { + syncUpdateCacheFreqForIndexesOfTable(view.get(), stmt, + viewTenantId); + } + } + + private static Optional<PTable> resolveView(final String viewName, + final PhoenixConnection conn) throws SQLException { + PTable view; + try { + view = PhoenixRuntime.getTable(conn, viewName); + } catch (TableNotFoundException e) { + // Ignore + LOGGER.error("Error getting PTable for view: {}", viewName, e); + return Optional.empty(); + } + return Optional.of(view); + } + /** * Make sure that all tables have necessary column family properties in sync * with each other and also in sync with all the table's indexes