This is an automated email from the ASF dual-hosted git repository.
gokcen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 13adc7e PHOENIX-6649 TransformTool to support views and tenant views
13adc7e is described below
commit 13adc7e7996b090f2be625abda1b7cdf356edb24
Author: Gokcen Iskender <[email protected]>
AuthorDate: Wed Feb 16 14:25:41 2022 -0800
PHOENIX-6649 TransformTool to support views and tenant views
Signed-off-by: Gokcen Iskender <[email protected]>
---
.../transform/TransformMonitorExtendedIT.java | 142 ++++++++++++++-
.../end2end/transform/TransformMonitorIT.java | 30 +++-
.../phoenix/end2end/transform/TransformToolIT.java | 191 ++++++++++++++++++++-
.../phoenix/mapreduce/PhoenixInputFormat.java | 2 +-
.../PhoenixTransformWithViewsInputFormat.java | 129 ++++++++++++++
.../phoenix/mapreduce/transform/TransformTool.java | 33 +++-
.../org/apache/phoenix/schema/MetaDataClient.java | 7 +
.../apache/phoenix/schema/transform/Transform.java | 65 +++++--
.../schema/transform/TransformMaintainer.java | 10 +-
9 files changed, 570 insertions(+), 39 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java
index 258133b..f2fe434 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorExtendedIT.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.end2end.transform;
import
org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -43,11 +44,13 @@ import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Map;
import java.util.Properties;
import static
org.apache.phoenix.end2end.transform.TransformMonitorIT.waitForTransformToGetToState;
+import static
org.apache.phoenix.end2end.transform.TransformToolIT.getTenantConnection;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -70,7 +73,7 @@ public class TransformMonitorExtendedIT extends BaseTest {
serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED,
Boolean.TRUE.toString());
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
- clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED,
Boolean.TRUE.toString());
+ clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED,
Boolean.TRUE.toString());
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
new ReadOnlyProps(clientProps.entrySet().iterator()));
@@ -103,9 +106,9 @@ public class TransformMonitorExtendedIT extends BaseTest {
public void testTransformIndexWithNamespaceEnabled() throws Exception {
String schemaName = "S_" + generateUniqueName();
String dataTableName = "TBL_" + generateUniqueName();
- String fullDataTableName = SchemaUtil.getTableName(schemaName ,
dataTableName);
+ String fullDataTableName = SchemaUtil.getTableName(schemaName,
dataTableName);
String indexName = "IDX_" + generateUniqueName();
- String fullIndexName = SchemaUtil.getTableName(schemaName , indexName);
+ String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
String createIndexStmt = "CREATE INDEX %s ON " + fullDataTableName + "
(NAME) INCLUDE (ZIP) ";
try (Connection conn = DriverManager.getConnection(getUrl(),
propsNamespace)) {
conn.setAutoCommit(true);
@@ -129,13 +132,14 @@ public class TransformMonitorExtendedIT extends BaseTest {
ResultSet rs = conn.createStatement().executeQuery("SELECT
\":ID\", \"0:ZIP\" FROM " + fullIndexName);
assertTrue(rs.next());
assertEquals("1", rs.getString(1));
- assertEquals( 95051, rs.getInt(2));
+ assertEquals(95051, rs.getInt(2));
assertTrue(rs.next());
assertEquals("2", rs.getString(1));
- assertEquals( 95052, rs.getInt(2));
+ assertEquals(95052, rs.getInt(2));
assertFalse(rs.next());
}
}
+
@Test
public void testTransformTableWithNamespaceEnabled() throws Exception {
String schemaName = "S_" + generateUniqueName();
@@ -156,16 +160,138 @@ public class TransformMonitorExtendedIT extends BaseTest
{
waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class),
record, PTable.TransformStatus.COMPLETED);
SingleCellIndexIT.assertMetadata(conn,
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS,
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS,
record.getNewPhysicalTableName());
TransformToolIT.upsertRows(conn, fullDataTableName, 2, 1);
- assertEquals(numOfRows+1, TransformMonitorIT.countRows(conn,
fullDataTableName));
+ assertEquals(numOfRows + 1, TransformMonitorIT.countRows(conn,
fullDataTableName));
ResultSet rs = conn.createStatement().executeQuery("SELECT ID, ZIP
FROM " + fullDataTableName);
assertTrue(rs.next());
assertEquals("1", rs.getString(1));
- assertEquals( 95051, rs.getInt(2));
+ assertEquals(95051, rs.getInt(2));
assertTrue(rs.next());
assertEquals("2", rs.getString(1));
- assertEquals( 95052, rs.getInt(2));
+ assertEquals(95052, rs.getInt(2));
assertFalse(rs.next());
}
}
+
+ @Test
+ public void testTransformWithGlobalAndTenantViews() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName1 = generateUniqueName();
+ String dataTableFullName1 = SchemaUtil.getTableName(schemaName,
dataTableName1);
+ String namespaceMappedDataTableName1 =
SchemaUtil.getPhysicalHBaseTableName(schemaName, dataTableName1,
true).getString();
+ String view1Name = SchemaUtil.getTableName(schemaName, "VW1_" +
generateUniqueName());
+ String view2Name = SchemaUtil.getTableName(schemaName, "VW2_" +
generateUniqueName());
+ String tenantView = SchemaUtil.getTableName(schemaName, "VWT_" +
generateUniqueName());
+ String readOnlyTenantView = SchemaUtil.getTableName(schemaName,
"ROVWT_" + generateUniqueName());
+
+ try (Connection conn = DriverManager.getConnection(getUrl(),
propsNamespace)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 1;
+ conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " +
schemaName);
+ TransformToolIT.createTableAndUpsertRows(conn, dataTableFullName1,
numOfRows, "TABLE_ONLY", dataTableDdl);
+
+ SingleCellIndexIT.assertMetadata(conn,
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN,
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName1);
+
+ String createViewSql = "CREATE VIEW " + view1Name + " ( VIEW_COL1
INTEGER, VIEW_COL2 VARCHAR ) AS SELECT * FROM "
+ + dataTableFullName1 + " where DATA='GLOBAL_VIEW' ";
+ conn.createStatement().execute(createViewSql);
+ PreparedStatement stmt1 =
conn.prepareStatement(String.format("UPSERT INTO %s VALUES(?, ? , ?, ?, ?,?)",
view1Name));
+ stmt1.setInt(1, 2);
+ stmt1.setString(2, "uname2");
+ stmt1.setInt(3, 95053);
+ stmt1.setString(4, "GLOBAL_VIEW");
+ stmt1.setInt(5, 111);
+ stmt1.setString(6, "viewcol2");
+ stmt1.executeUpdate();
+
+ createViewSql = "CREATE VIEW " + view2Name + " ( VIEW_COL1
INTEGER, VIEW_COL2 VARCHAR ) AS SELECT * FROM "
+ + dataTableFullName1 + " where DATA='GLOBAL_VIEW' AND
ZIP=95053";
+ conn.createStatement().execute(createViewSql);
+ stmt1 = conn.prepareStatement(String.format("UPSERT INTO %s
VALUES(?, ? , ?, ?, ?,?)", view1Name));
+ stmt1.setInt(1, 20);
+ stmt1.setString(2, "uname22");
+ stmt1.setInt(3, 95053);
+ stmt1.setString(4, "GLOBAL_VIEW");
+ stmt1.setInt(5, 111);
+ stmt1.setString(6, "viewcol22");
+ stmt1.executeUpdate();
+ }
+
+ try (Connection tenantConn1 = getTenantConnection("tenant1")) {
+ tenantConn1.setAutoCommit(true);
+ String createViewSql = "CREATE VIEW " + tenantView + " (
VIEW_TCOL1 INTEGER, VIEW_TCOL2 VARCHAR ) " +
+ " AS SELECT * FROM "
+ + dataTableFullName1 + " where DATA='TENANT_VIEW'";
+ tenantConn1.createStatement().execute(createViewSql);
+
+ PreparedStatement stmt1 = tenantConn1.prepareStatement(
+ String.format("UPSERT INTO %s (ID, NAME, ZIP, DATA,
VIEW_TCOL1, VIEW_TCOL2) " +
+ "VALUES(?, ? , ?, ?, ?, ?)", tenantView));
+ stmt1.setInt(1, 4);
+ stmt1.setString(2, "uname4");
+ stmt1.setInt(3, 95054);
+ stmt1.setString(4, "TENANT_VIEW");
+ stmt1.setInt(5, 2001);
+ stmt1.setString(6, "tenantviewcol");
+ stmt1.executeUpdate();
+
+ // ZIP field values are like 95050 + i
+ createViewSql = "CREATE VIEW " + readOnlyTenantView + " (
VIEW_TCOL1 INTEGER, VIEW_TCOL2 VARCHAR ) AS SELECT * FROM "
+ + dataTableFullName1 + " where DATA='TENANT_VIEW' AND ZIP
> 95050";
+ tenantConn1.createStatement().execute(createViewSql);
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(),
propsNamespace)) {
+ conn.setAutoCommit(true);
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName1
+
+ " SET
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS,
COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record =
Transform.getTransformRecord(schemaName, dataTableName1, null, null,
conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ waitForTransformToGetToState(conn.unwrap(PhoenixConnection.class),
record, PTable.TransformStatus.COMPLETED);
+ assertMetadata(conn,
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS,
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS,
record.getNewPhysicalTableName());
+
+ try (Admin admin =
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+
admin.disableTable(TableName.valueOf(namespaceMappedDataTableName1));
+
admin.truncateTable(TableName.valueOf(namespaceMappedDataTableName1), true);
+ }
+
+ SingleCellIndexIT.dumpTable(schemaName + ":" + dataTableName1 +
"_1");
+
+ String sql = "SELECT VIEW_COL1, VIEW_COL2 FROM %s WHERE
DATA='GLOBAL_VIEW' ";
+ ResultSet rs1 =
conn.createStatement().executeQuery(String.format(sql, view1Name));
+ assertTrue(rs1.next());
+ assertEquals(111, rs1.getInt(1));
+ assertEquals("viewcol2", rs1.getString(2));
+ assertTrue(rs1.next());
+ assertEquals("viewcol22", rs1.getString(2));
+ assertFalse(rs1.next());
+
+ rs1 = conn.createStatement().executeQuery(String.format(sql,
view2Name));
+ assertTrue(rs1.next());
+ assertEquals(111, rs1.getInt(1));
+ assertEquals("viewcol2", rs1.getString(2));
+ assertTrue(rs1.next());
+ assertEquals("viewcol22", rs1.getString(2));
+ assertFalse(rs1.next());
+
+ sql = "SELECT DATA FROM %s WHERE ID=1";
+ rs1 = conn.createStatement().executeQuery(String.format(sql,
dataTableFullName1));
+ assertFalse(rs1.next());
+ }
+
+ try (Connection tenantConn1 = getTenantConnection("tenant1")) {
+ String sql = "SELECT VIEW_TCOL1, VIEW_TCOL2 FROM %s ";
+ ResultSet rs1 =
tenantConn1.createStatement().executeQuery(String.format(sql, tenantView));
+
+ assertTrue(rs1.next());
+ assertEquals(2001, rs1.getInt(1));
+ assertEquals("tenantviewcol", rs1.getString(2));
+
+ ResultSet rs2 =
tenantConn1.createStatement().executeQuery(String.format(sql,
readOnlyTenantView));
+ assertTrue(rs2.next());
+ assertEquals(2001, rs2.getInt(1));
+ assertEquals("tenantviewcol", rs2.getString(2));
+ }
+ }
+
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java
index 42506fc..48ec70b 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformMonitorIT.java
@@ -169,6 +169,11 @@ public class TransformMonitorIT extends
ParallelStatsDisabledIT {
assertMetadata(conn,
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS,
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, viewName);
conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+ ResultSet rs = conn.createStatement().executeQuery("SELECT
VIEW_COL2 FROM " + viewName + " WHERE VIEW_COL1=100");
+ assertTrue(rs.next());
+ assertEquals("viewCol2", rs.getString(1));
+ assertFalse(rs.next());
+
int additionalRows = 2;
// Upsert new rows to new table. Note that after transform is
complete, we are using the new table
TransformToolIT.upsertRows(conn, viewName, (int)newRowCount+1,
additionalRows);
@@ -190,7 +195,7 @@ public class TransformMonitorIT extends
ParallelStatsDisabledIT {
assertEquals((newRowCount+additionalRows)*2,
countRowsForViewIndex(conn, dataTableFullName));
conn.createStatement().execute("UPSERT INTO " + viewName2 +
"(ID, NAME, VIEW_COL1, VIEW_COL2) VALUES (100, 'uname100', 1000,
'viewCol100')");
- ResultSet rs = conn.createStatement().executeQuery("SELECT
VIEW_COL2, NAME FROM " + viewName2 + " WHERE VIEW_COL1=1000");
+ rs = conn.createStatement().executeQuery("SELECT VIEW_COL2,
NAME FROM " + viewName2 + " WHERE VIEW_COL1=1000");
assertTrue(rs.next());
assertEquals("viewCol100", rs.getString(1));
assertEquals("uname100", rs.getString(2));
@@ -536,20 +541,23 @@ public class TransformMonitorIT extends
ParallelStatsDisabledIT {
try (Connection conn1 = DriverManager.getConnection(getUrl(),
testProps)) {
conn1.setAutoCommit(true);
int numOfRows = 1;
- TransformToolIT.createTableAndUpsertRows(conn1, dataTableName,
numOfRows, isImmutable? " IMMUTABLE_ROWS=true" : "");
+ TransformToolIT.createTableAndUpsertRows(conn1, dataTableName,
numOfRows, isImmutable ? " IMMUTABLE_ROWS=true" : "");
- conn1.createStatement().execute("ALTER TABLE " + dataTableName +
- " SET
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS,
COLUMN_ENCODED_BYTES=2");
- SystemTransformRecord record = Transform.getTransformRecord(null,
dataTableName, null, null, conn1.unwrap(PhoenixConnection.class));
- assertNotNull(record);
-
waitForTransformToGetToState(conn1.unwrap(PhoenixConnection.class), record,
PTable.TransformStatus.COMPLETED);
-
- // A connection does transform and another connection doesn't try
to upsert into old table
String url2 = url + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR +
"LongRunningQueries";
try (Connection conn2 = DriverManager.getConnection(url2,
PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
conn2.setAutoCommit(true);
TransformToolIT.upsertRows(conn2, dataTableName, 2, 1);
+ conn1.createStatement().execute("ALTER TABLE " + dataTableName
+
+ " SET
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS,
COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record =
Transform.getTransformRecord(null, dataTableName, null, null,
conn1.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+
waitForTransformToGetToState(conn1.unwrap(PhoenixConnection.class), record,
PTable.TransformStatus.COMPLETED);
+ assertMetadata(conn1,
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS,
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS,
record.getNewPhysicalTableName());
+
+ // A connection does transform and another connection doesn't
try to upsert into old table
+ TransformToolIT.upsertRows(conn2, dataTableName, 3, 1);
+
ResultSet rs = conn2.createStatement().executeQuery("SELECT
ID, NAME, ZIP FROM " + dataTableName);
assertTrue(rs.next());
assertEquals("1", rs.getString(1));
@@ -559,6 +567,10 @@ public class TransformMonitorIT extends
ParallelStatsDisabledIT {
assertEquals("2", rs.getString(1));
assertEquals("uname2", rs.getString(2));
assertEquals( 95052, rs.getInt(3));
+ assertTrue(rs.next());
+ assertEquals("3", rs.getString(1));
+ assertEquals("uname3", rs.getString(2));
+ assertEquals( 95053, rs.getInt(3));
assertFalse(rs.next());
}
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
index e0702f8..dd57ce0 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
@@ -73,7 +73,6 @@ import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEF
import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT;
import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
import static
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
-import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_VALUE_BYTES;
import static org.apache.phoenix.query.QueryConstants.UNVERIFIED_BYTES;
import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -137,22 +136,33 @@ public class TransformToolIT extends
ParallelStatsDisabledIT {
}
public static void createTableAndUpsertRows(Connection conn, String
dataTableFullName, int numOfRows, String tableOptions) throws SQLException {
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows, "",
tableOptions);
+ }
+
+ public static void createTableAndUpsertRows(Connection conn, String
dataTableFullName, int numOfRows, String constantVal, String tableOptions)
throws SQLException {
String stmString1 =
"CREATE TABLE IF NOT EXISTS " + dataTableFullName
- + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR,
ZIP INTEGER) "
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR,
ZIP INTEGER, DATA VARCHAR) "
+ tableOptions;
conn.createStatement().execute(stmString1);
- upsertRows(conn, dataTableFullName, 1, numOfRows);
+ upsertRows(conn, dataTableFullName, 1, numOfRows, constantVal);
conn.commit();
}
public static void upsertRows(Connection conn, String dataTableFullName,
int startIdx, int numOfRows) throws SQLException {
- String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",
dataTableFullName);
- PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+ upsertRows(conn, dataTableFullName, startIdx, numOfRows, "");
+ }
+ public static void upsertRows(Connection conn, String dataTableFullName,
int startIdx, int numOfRows, String constantVal) throws SQLException {
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?,
?)", dataTableFullName);
+ PreparedStatement stmt = conn.prepareStatement(upsertQuery);
// insert rows
for (int i = startIdx; i < startIdx+numOfRows; i++) {
- IndexToolIT.upsertRow(stmt1, i);
+ stmt.setInt(1, i);
+ stmt.setString(2, "uname" + String.valueOf(i));
+ stmt.setInt(3, 95050 + i);
+ stmt.setString(4, constantVal);
+ stmt.executeUpdate();
}
}
@Test
@@ -948,6 +958,175 @@ public class TransformToolIT extends
ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testTransformForGlobalViews() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName,
dataTableName);
+ String view1Name = "VW1_" + generateUniqueName();
+ String view2Name = "VW2_" + generateUniqueName();
+ String upsertQuery = "UPSERT INTO %s VALUES(?, ?, ?, ?, ?, ?)";
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 0;
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows,
tableDDLOptions);
+ SingleCellIndexIT.assertMetadata(conn,
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN,
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+ String createViewSql = "CREATE VIEW " + view1Name + " ( VIEW_COL11
INTEGER, VIEW_COL12 VARCHAR ) AS SELECT * FROM "
+ + dataTableFullName + " where ID=1";
+ conn.createStatement().execute(createViewSql);
+
+ createViewSql = "CREATE VIEW " + view2Name + " ( VIEW_COL21
INTEGER, VIEW_COL22 VARCHAR ) AS SELECT * FROM "
+ + dataTableFullName + " where ID=11";
+ conn.createStatement().execute(createViewSql);
+
+ PreparedStatement stmt1 =
conn.prepareStatement(String.format(upsertQuery, view1Name));
+ stmt1.setInt(1, 1);
+ stmt1.setString(2, "uname1");
+ stmt1.setInt(3, 95051);
+ stmt1.setString(4, "");
+ stmt1.setInt(5, 101);
+ stmt1.setString(6, "viewCol12");
+ stmt1.executeUpdate();
+ conn.commit();
+
+ stmt1 = conn.prepareStatement(String.format(upsertQuery,
view2Name));
+ stmt1.setInt(1, 11);
+ stmt1.setString(2, "uname11");
+ stmt1.setInt(3, 950511);
+ stmt1.setString(4, "");
+ stmt1.setInt(5, 111);
+ stmt1.setString(6, "viewCol22");
+ stmt1.executeUpdate();
+ conn.commit();
+
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS,
COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record =
Transform.getTransformRecord(schemaName, dataTableName, null, null,
conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertMetadata(conn,
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS,
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS,
record.getNewPhysicalTableName());
+
+ List<String> args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, false, false, false);
+ runTransformTool(args.toArray(new String[0]), 0);
+ Transform.doCutover(conn.unwrap(PhoenixConnection.class), record);
+
Transform.updateTransformRecord(conn.unwrap(PhoenixConnection.class), record,
PTable.TransformStatus.COMPLETED);
+ try (Admin admin =
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+ admin.disableTable(TableName.valueOf(dataTableFullName));
+ admin.truncateTable(TableName.valueOf(dataTableFullName),
true);
+ }
+
+ String sql = "SELECT VIEW_COL11, VIEW_COL12 FROM %s ";
+ ResultSet rs1 =
conn.createStatement().executeQuery(String.format(sql, view1Name));
+ assertTrue(rs1.next());
+ assertEquals(101, rs1.getInt(1));
+ assertEquals("viewCol12", rs1.getString(2));
+
+ sql = "SELECT VIEW_COL21, VIEW_COL22 FROM %s ";
+ rs1 = conn.createStatement().executeQuery(String.format(sql,
view2Name));
+ assertTrue(rs1.next());
+ assertEquals(111, rs1.getInt(1));
+ assertEquals("viewCol22", rs1.getString(2));
+ }
+ }
+
+ @Test
+ public void testTransformForTenantViews() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName,
dataTableName);
+ String view1Name = "VW1_" + generateUniqueName();
+ String view2Name = "VW2_" + generateUniqueName();
+ String upsertQuery = "UPSERT INTO %s VALUES(?, ?, ?, ?, ?, ?)";
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ int numOfRows = 0;
+ createTableAndUpsertRows(conn, dataTableFullName, numOfRows,
tableDDLOptions);
+ SingleCellIndexIT.assertMetadata(conn,
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN,
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+ }
+
+ try (Connection tenantConn1 = getTenantConnection("tenant1")) {
+ String createViewSql = "CREATE VIEW " + view1Name + " ( VIEW_COL11
INTEGER, VIEW_COL12 VARCHAR ) AS SELECT * FROM "
+ + dataTableFullName + " where ID=1";
+ tenantConn1.createStatement().execute(createViewSql);
+ }
+
+ try (Connection tenantConn2 = getTenantConnection("tenant2")) {
+ String createViewSql = "CREATE VIEW " + view2Name + " ( VIEW_COL21
INTEGER, VIEW_COL22 VARCHAR ) AS SELECT * FROM "
+ + dataTableFullName + " where ID=11";
+ tenantConn2.createStatement().execute(createViewSql);
+ }
+
+ try (Connection tenantConn1 = getTenantConnection("tenant1")) {
+ PreparedStatement stmt1 =
tenantConn1.prepareStatement(String.format(upsertQuery, view1Name));
+ stmt1.setInt(1, 1);
+ stmt1.setString(2, "uname1");
+ stmt1.setInt(3, 95051);
+ stmt1.setString(4, "");
+ stmt1.setInt(5, 101);
+ stmt1.setString(6, "viewCol12");
+ stmt1.executeUpdate();
+ tenantConn1.commit();
+ }
+
+ try (Connection tenantConn2 = getTenantConnection("tenant2")) {
+ PreparedStatement stmt1 =
tenantConn2.prepareStatement(String.format(upsertQuery, view2Name));
+ stmt1.setInt(1, 11);
+ stmt1.setString(2, "uname11");
+ stmt1.setInt(3, 950511);
+ stmt1.setString(4, "");
+ stmt1.setInt(5, 111);
+ stmt1.setString(6, "viewCol22");
+ stmt1.executeUpdate();
+ tenantConn2.commit();
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+ " SET
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS,
COLUMN_ENCODED_BYTES=2");
+ SystemTransformRecord record =
Transform.getTransformRecord(schemaName, dataTableName, null, null,
conn.unwrap(PhoenixConnection.class));
+ assertNotNull(record);
+ assertMetadata(conn,
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS,
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS,
record.getNewPhysicalTableName());
+
+ List<String> args = getArgList(schemaName, dataTableName, null,
+ null, null, null, false, false, false, false, false);
+ runTransformTool(args.toArray(new String[0]), 0);
+ Transform.doCutover(conn.unwrap(PhoenixConnection.class), record);
+
Transform.updateTransformRecord(conn.unwrap(PhoenixConnection.class), record,
PTable.TransformStatus.COMPLETED);
+ try (Admin admin =
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+ admin.disableTable(TableName.valueOf(dataTableFullName));
+ admin.truncateTable(TableName.valueOf(dataTableFullName),
true);
+ }
+ }
+
+ try (Connection tenantConn1 = getTenantConnection("tenant1")) {
+ String sql = "SELECT VIEW_COL11, VIEW_COL12 FROM %s ";
+ ResultSet rs1 =
tenantConn1.createStatement().executeQuery(String.format(sql, view1Name));
+ assertTrue(rs1.next());
+ assertEquals(101, rs1.getInt(1));
+ assertEquals("viewCol12", rs1.getString(2));
+ }
+
+ try (Connection tenantConn2 = getTenantConnection("tenant2")) {
+ String sql = "SELECT VIEW_COL21, VIEW_COL22 FROM %s ";
+ ResultSet rs1 =
tenantConn2.createStatement().executeQuery(String.format(sql, view2Name));
+ assertTrue(rs1.next());
+ assertEquals(111, rs1.getInt(1));
+ assertEquals("viewCol22", rs1.getString(2));
+ }
+ }
+
+
+ public static Connection getTenantConnection(String tenant) throws
SQLException {
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenant);
+ return DriverManager.getConnection(getUrl(), props);
+ }
+
public static void assertTransformStatusOrPartial(PTable.TransformStatus
expectedStatus, SystemTransformRecord systemTransformRecord) {
if
(systemTransformRecord.getTransformStatus().equals(expectedStatus.name())) {
return;
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index c294fed..4eeb158 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -87,7 +87,7 @@ public class PhoenixInputFormat<T extends DBWritable> extends
InputFormat<NullWr
return generateSplits(queryPlan, configuration);
}
- private List<InputSplit> generateSplits(final QueryPlan qplan,
Configuration config) throws IOException {
+ protected List<InputSplit> generateSplits(final QueryPlan qplan,
Configuration config) throws IOException {
// We must call this in order to initialize the scans and splits from
the query plan
setupParallelScansFromQueryPlan(qplan);
final List<KeyRange> splits = qplan.getSplits();
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformWithViewsInputFormat.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformWithViewsInputFormat.java
new file mode 100644
index 0000000..8156ba8
--- /dev/null
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/PhoenixTransformWithViewsInputFormat.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.transform;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.ServerBuildTransformingTableCompiler;
+import org.apache.phoenix.coprocessor.TableInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ViewUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES;
+import static
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
+
+public class PhoenixTransformWithViewsInputFormat<T extends DBWritable>
extends PhoenixServerBuildIndexInputFormat {
+ private static final Logger LOGGER =
+
LoggerFactory.getLogger(PhoenixTransformWithViewsInputFormat.class);
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException,
InterruptedException {
+ final Configuration configuration = context.getConfiguration();
+ try (PhoenixConnection connection = (PhoenixConnection)
+ ConnectionUtil.getInputConnection(configuration)) {
+ try (Table hTable =
connection.unwrap(PhoenixConnection.class).getQueryServices().getTable(
+
SchemaUtil.getPhysicalTableName(SYSTEM_CHILD_LINK_NAME_BYTES,
configuration).toBytes())) {
+ String oldDataTableFullName =
PhoenixConfigurationUtil.getIndexToolDataTableName(configuration);
+ String newDataTableFullName =
getIndexToolIndexTableName(configuration);
+ PTable newDataTable =
PhoenixRuntime.getTableNoCache(connection, newDataTableFullName);
+ String schemaName =
SchemaUtil.getSchemaNameFromFullName(oldDataTableFullName);
+ String tableName =
SchemaUtil.getTableNameFromFullName(oldDataTableFullName);
+ byte[] schemaNameBytes = Strings.isNullOrEmpty(schemaName) ?
null : schemaName.getBytes();
+ Pair<List<PTable>, List<TableInfo>> allDescendantViews =
ViewUtil.findAllDescendantViews(hTable, configuration, null, schemaNameBytes,
+ tableName.getBytes(),
EnvironmentEdgeManager.currentTimeMillis(), false);
+ List<PTable> legitimateDecendants =
allDescendantViews.getFirst();
+
+ List<InputSplit> inputSplits = new ArrayList<>();
+
+ HashMap<String, PColumn> columnMap = new HashMap<>();
+ for (PColumn column : newDataTable.getColumns()) {
+ columnMap.put(column.getName().getString(), column);
+ }
+
+ for (PTable decendant : legitimateDecendants) {
+ if (decendant.getViewType() == PTable.ViewType.READ_ONLY) {
+ continue;
+ }
+ PTable newView = Transform.getTransformedView(decendant,
newDataTable, columnMap, true);
+ QueryPlan queryPlan = getQueryPlan(newView, decendant,
connection);
+ inputSplits.addAll(generateSplits(queryPlan,
configuration));
+ }
+ if (inputSplits.size() == 0) {
+ // Get for base table
+ ServerBuildTransformingTableCompiler compiler = new
ServerBuildTransformingTableCompiler(connection,
+ oldDataTableFullName);
+ MutationPlan plan = compiler.compile(newDataTable);
+ inputSplits.addAll(generateSplits(plan.getQueryPlan(),
configuration));
+ }
+ return inputSplits;
+ }
+ } catch (Exception e) {
+ LOGGER.error("PhoenixTransformWithViewsInputFormat failed with: "
+ e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
+ private QueryPlan getQueryPlan(PTable newTable, PTable oldTable,
PhoenixConnection phoenixConnection) throws SQLException {
+ String tableTenantId = oldTable.getTenantId() == null?
null:oldTable.getTenantId().getString();
+ String connTenantId = phoenixConnection.getTenantId()==null?
null:phoenixConnection.getTenantId().getString();
+ if (!Strings.isNullOrEmpty(tableTenantId) &&
!StringUtils.equals(tableTenantId, connTenantId)) {
+ Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tableTenantId);
+
+ try (PhoenixConnection tenantConnection = (PhoenixConnection)
+ DriverManager.getConnection(phoenixConnection.getURL(),
props)) {
+ return getQueryPlanInternal(newTable, oldTable,
tenantConnection);
+ }
+ }
+ return getQueryPlanInternal(newTable, oldTable, phoenixConnection);
+ }
+
+ private QueryPlan getQueryPlanInternal(PTable newTable, PTable decendant,
PhoenixConnection phoenixConnection) throws SQLException {
+ ServerBuildTransformingTableCompiler compiler = new
ServerBuildTransformingTableCompiler(phoenixConnection,
+ SchemaUtil.getTableName(decendant.getSchemaName(),
decendant.getTableName()).getString());
+
+ MutationPlan plan = compiler.compile(newTable);
+ return plan.getQueryPlan();
+ }
+}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
index 1159e73..ff83e1b 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.mapreduce.transform;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.phoenix.mapreduce.PhoenixTTLTool;
import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
@@ -83,6 +84,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TransactionUtil;
+import org.apache.phoenix.util.ViewUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,6 +99,7 @@ import java.util.List;
import java.util.UUID;
import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES;
import static
org.apache.phoenix.mapreduce.index.IndexTool.createIndexToolTables;
import static org.apache.phoenix.mapreduce.index.IndexTool.isTimeRangeSet;
import static org.apache.phoenix.mapreduce.index.IndexTool.validateTimeRange;
@@ -171,6 +174,9 @@ public class TransformTool extends Configured implements
Tool {
private static final Option END_TIME_OPTION = new Option("et", "end-time",
true, "End time for transform");
+ private static final Option SPLIT_SIZE_OPTION = new Option("ms",
"split-size-per-mapper", true,
+ "Define split size for each mapper.");
+
public static final String TRANSFORM_JOB_NAME_TEMPLATE =
"PHOENIX_TRANS_%s.%s.%s";
public static final String PARTIAL_TRANSFORM_NOT_APPLICABLE = "Partial
transform accepts "
@@ -219,6 +225,7 @@ public class TransformTool extends Configured implements
Tool {
private boolean shouldFixUnverified;
private boolean shouldUseNewTableAsSource;
private boolean shouldForceCutover;
+ private int splitSize;
private Job job;
public Long getStartTime() {
@@ -265,6 +272,7 @@ public class TransformTool extends Configured implements
Tool {
options.addOption(PARTIAL_TRANSFORM_OPTION);
options.addOption(START_TIME_OPTION);
options.addOption(END_TIME_OPTION);
+ options.addOption(SPLIT_SIZE_OPTION);
options.addOption(FIX_UNVERIFIED_TRANSFORM_OPTION);
options.addOption(FORCE_CUTOVER_OPTION);
options.addOption(USE_NEW_TABLE_AS_SOURCE_OPTION);
@@ -350,6 +358,11 @@ public class TransformTool extends Configured implements
Tool {
indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+ if (cmdLine.hasOption(SPLIT_SIZE_OPTION.getOpt())) {
+ splitSize =
Integer.parseInt(cmdLine.getOptionValue(SPLIT_SIZE_OPTION.getOpt()));
+ } else {
+ splitSize = PhoenixTTLTool.DEFAULT_MAPPER_SPLIT_SIZE;
+ }
logicalTableName = dataTable;
logicalParentName = null;
if (!Strings.isNullOrEmpty(indexTable)) {
@@ -574,12 +587,28 @@ public class TransformTool extends Configured implements
Tool {
fs = outputPath.getFileSystem(configuration);
fs.delete(outputPath, true);
}
+ PhoenixConfigurationUtil.setMultiInputMapperSplitSize(configuration,
splitSize);
+
this.job = Job.getInstance(getConf(), jobName);
job.setJarByClass(TransformTool.class);
job.setPriority(this.jobPriority);
- PhoenixMapReduceUtil.setInput(job,
PhoenixServerBuildIndexDBWritable.class,
PhoenixServerBuildIndexInputFormat.class,
- oldTableWithSchema, "");
+ boolean hasChildViews = false;
+ try (Table hTable =
connection.unwrap(PhoenixConnection.class).getQueryServices().getTable(
+ SchemaUtil.getPhysicalTableName(SYSTEM_CHILD_LINK_NAME_BYTES,
configuration).toBytes())) {
+ byte[] tenantIdBytes = Strings.isNullOrEmpty(tenantId) ? null :
tenantId.getBytes();
+ byte[] schemaNameBytes = Strings.isNullOrEmpty(schemaName) ? null
: schemaName.getBytes();
+ hasChildViews = ViewUtil.hasChildViews(hTable, tenantIdBytes,
schemaNameBytes,
+ pOldTable.getTableName().getBytes(),
HConstants.LATEST_TIMESTAMP);
+ }
+
+ if (hasChildViews && Strings.isNullOrEmpty(tenantId)) {
+ PhoenixMapReduceUtil.setInput(job,
PhoenixServerBuildIndexDBWritable.class,
PhoenixTransformWithViewsInputFormat.class,
+ oldTableWithSchema, "");
+ } else {
+ PhoenixMapReduceUtil.setInput(job,
PhoenixServerBuildIndexDBWritable.class,
PhoenixServerBuildIndexInputFormat.class,
+ oldTableWithSchema, "");
+ }
if (outputPath != null) {
FileOutputFormat.setOutputPath(job, outputPath);
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index c696bb4..9663653 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -896,6 +896,13 @@ public class MetaDataClient {
MetaDataMutationResult parentResult =
updateCache(connection.getTenantId(), parentSchemaName, tableName,
false, resolvedTimestamp);
PTable parentTable = parentResult.getTable();
+ if (parentResult.getMutationCode() ==
MutationCode.TABLE_NOT_FOUND || parentTable == null) {
+ // Try once more with different tenant id (connection can
be global but view could be tenant
+ parentResult =
+ updateCache(table.getTenantId(), parentSchemaName,
tableName, false,
+ resolvedTimestamp);
+ parentTable = parentResult.getTable();
+ }
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("addColumnsAndIndexesFromAncestors parent
logical name " + table.getBaseTableLogicalName().getString() + " parent name "
+ table.getParentName().getString() + " tableName=" + table.getName());
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
index 7276c65..ab4aac4 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
@@ -61,6 +61,7 @@ import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -463,11 +464,16 @@ public class Transform {
// We need to update the columns's qualifiers as well
mutateColumns(connection.unwrap(PhoenixConnection.class),
pOldTable, pNewTable);
+ HashMap<String, PColumn> columnMap = new HashMap<>();
+ for (PColumn column : pNewTable.getColumns()) {
+ columnMap.put(column.getName().getString(), column);
+ }
+
// Also update view column qualifiers
for (TableInfo view : childViewsResult.getLinks()) {
PTable pView = PhoenixRuntime.getTable(connection,
view.getTenantId()==null? null: Bytes.toString(view.getTenantId())
, SchemaUtil.getTableName(view.getSchemaName(),
view.getTableName()));
-
mutateViewColumns(connection.unwrap(PhoenixConnection.class), pView, pNewTable);
+
mutateViewColumns(connection.unwrap(PhoenixConnection.class), pView, pNewTable,
columnMap);
}
}
connection.commit();
@@ -612,13 +618,16 @@ public class Transform {
}
}
- private static void mutateViewColumns(PhoenixConnection connection, PTable
pView, PTable pNewTable) throws SQLException {
- if (pView.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+ public static PTable getTransformedView(PTable pOldView, PTable pNewTable,
HashMap<String, PColumn> columnMap, boolean withDerivedColumns) throws
SQLException {
+ List<PColumn> newColumns = new ArrayList<>();
+ PTable pNewView = null;
+ if (pOldView.getEncodingScheme() != pNewTable.getEncodingScheme()) {
Short nextKeySeq = 0;
PTable.EncodedCQCounter cqCounterToUse =
pNewTable.getEncodedCQCounter();
String defaultColumnFamily = pNewTable.getDefaultFamilyName() !=
null && !Strings.isNullOrEmpty(pNewTable.getDefaultFamilyName().getString()) ?
pNewTable.getDefaultFamilyName().getString() :
DEFAULT_COLUMN_FAMILY;
- for (PColumn column : pView.getColumns()) {
+
+ for (PColumn column : pOldView.getColumns()) {
boolean isPk = SchemaUtil.isPKColumn(column);
Short keySeq = isPk ? ++nextKeySeq : null;
if (isPk) {
@@ -630,15 +639,18 @@ public class Transform {
} else {
familyName = defaultColumnFamily;
}
- int encodedCQ = pView.isAppendOnlySchema() ?
Integer.valueOf(ENCODED_CQ_COUNTER_INITIAL_VALUE + keySeq) :
cqCounterToUse.getNextQualifier(familyName);
- if (!pView.isAppendOnlySchema()) {
- cqCounterToUse.increment(familyName);
- }
-
+ int encodedCQ = pOldView.isAppendOnlySchema() ?
Integer.valueOf(ENCODED_CQ_COUNTER_INITIAL_VALUE + keySeq) :
cqCounterToUse.getNextQualifier(familyName);
byte[] colQualifierBytes =
EncodedColumnsUtil.getColumnQualifierBytes(column.getName().getString(),
encodedCQ, pNewTable, isPk);
+ if (columnMap.containsKey(column.getName().getString())) {
+ colQualifierBytes =
columnMap.get(column.getName().getString()).getColumnQualifierBytes();
+ } else {
+ if (!column.isDerived()) {
+ cqCounterToUse.increment(familyName);
+ }
+ }
- if (column.isDerived()) {
+ if (!withDerivedColumns && column.isDerived()) {
// Don't need to add/change derived columns
continue;
}
@@ -648,8 +660,37 @@ public class Transform {
, column.getArraySize(),
column.getViewConstant(), column.isViewReferenced(),
column.getExpressionStr(), column.isRowTimestamp(),
column.isDynamic(), colQualifierBytes,
EnvironmentEdgeManager.currentTimeMillis());
- String tenantId = pView.getTenantId() == null?
null:pView.getTenantId().getString();
- addColumnMutation(connection, tenantId,
pView.getSchemaName()==null?null:pView.getSchemaName().getString()
+ newColumns.add(newCol);
+ if (!columnMap.containsKey(newCol.getName().getString())) {
+ columnMap.put(newCol.getName().getString(), newCol) ;
+ }
+ }
+
+ pNewView = PTableImpl.builderWithColumns(pOldView, newColumns)
+
.setQualifierEncodingScheme(pNewTable.getEncodingScheme())
+
.setImmutableStorageScheme(pNewTable.getImmutableStorageScheme())
+ .setPhysicalNames(
+
Collections.singletonList(SchemaUtil.getPhysicalHBaseTableName(
+ pNewTable.getSchemaName(),
pNewTable.getTableName(), pNewTable.isNamespaceMapped())))
+ .build();
+ } else {
+ // Have to change this per transform type
+ }
+ return pNewView;
+ }
+
+ private static void mutateViewColumns(PhoenixConnection connection, PTable
pView, PTable pNewTable, HashMap<String, PColumn> columnMap) throws
SQLException {
+ if (pView.getEncodingScheme() != pNewTable.getEncodingScheme()) {
+ Short nextKeySeq = 0;
+ PTable newView = getTransformedView(pView, pNewTable,
columnMap,false);
+ for (PColumn newCol : newView.getColumns()) {
+ boolean isPk = SchemaUtil.isPKColumn(newCol);
+ Short keySeq = isPk ? ++nextKeySeq : null;
+ if (isPk) {
+ continue;
+ }
+ String tenantId = pView.getTenantId() == null ? null :
pView.getTenantId().getString();
+ addColumnMutation(connection, tenantId, pView.getSchemaName()
== null ? null : pView.getSchemaName().getString()
, pView.getTableName().getString(), newCol,
pView.getParentTableName() == null ? null :
pView.getParentTableName().getString()
, pView.getPKName() == null ? null :
pView.getPKName().getString(), keySeq, pView.getBucketNum() != null);
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
index df33c41..c084a27 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
@@ -90,6 +90,8 @@ public class TransformMaintainer extends IndexMaintainer {
private int nOldTableCFs;
private boolean newTableWALDisabled;
private boolean newTableImmutableRows;
+ private Set<ColumnReference> allColumns;
+
// Transient state
private final boolean isOldTableSalted;
private final RowKeySchema oldTableRowKeySchema;
@@ -133,9 +135,13 @@ public class TransformMaintainer extends IndexMaintainer {
}
public Set<ColumnReference> getAllColumns() {
- return new HashSet<>();
+ return allColumns;
}
+ public Set<ColumnReference> getCoveredColumns() {
+ return coveredColumnsMap.keySet();
+ }
+
private TransformMaintainer(final PTable oldTable, final PTable newTable,
PhoenixConnection connection) {
this(oldTable.getRowKeySchema(), oldTable.getBucketNum() != null);
this.newTableRowKeyOrderOptimizable =
newTable.rowKeyOrderOptimizable();
@@ -250,6 +256,8 @@ public class TransformMaintainer extends IndexMaintainer {
* Init calculated state reading/creating
*/
private void initCachedState() {
+ this.allColumns =
Sets.newLinkedHashSetWithExpectedSize(newTableExpressions.size() +
coveredColumnsMap.size());
+
byte[] newTableEmptyKvQualifier =
EncodedColumnsUtil.getEmptyKeyValueInfo(newTableEncodingScheme).getFirst();
byte[] oldTableEmptyKvQualifier =
EncodedColumnsUtil.getEmptyKeyValueInfo(oldTableEncodingScheme).getFirst();
newTableEmptyKeyValueRef = new
ColumnReference(oldTableEmptyKeyValueCF, newTableEmptyKvQualifier);