gokceni commented on a change in pull request #1373:
URL: https://github.com/apache/phoenix/pull/1373#discussion_r788160549



##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
##########
@@ -379,71 +421,265 @@ public static void upsertTransform(
         }
     }
 
+    public static void doCutover(PhoenixConnection connection, 
SystemTransformRecord systemTransformRecord) throws Exception{
+        String tenantId = systemTransformRecord.getTenantId();
+        String schema = systemTransformRecord.getSchemaName();
+        String tableName = systemTransformRecord.getLogicalTableName();
+        String newTableName = 
SchemaUtil.getTableNameFromFullName(systemTransformRecord.getNewPhysicalTableName());
+
+        // Calculate changed metadata
+        List<String> columnNames = new ArrayList<>();
+        List<String> columnValues = new ArrayList<>();
+
+        getMetadataDifference(connection, systemTransformRecord, columnNames, 
columnValues);
+        // TODO In the future, we need to handle rowkey changes and column 
type changes as well
+
+        String
+                changeViewStmt = "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, 
TABLE_SCHEM, TABLE_NAME %s) VALUES (%s, %s, '%s' %s)";
+
+        String
+                changeTable = String.format(
+                "UPSERT INTO SYSTEM.CATALOG (TENANT_ID, TABLE_SCHEM, 
TABLE_NAME, PHYSICAL_TABLE_NAME %s) VALUES (%s, %s, '%s','%s' %s)",
+                (columnNames.size() > 0? "," +String.join(",", 
columnNames):""),
+                (tenantId==null? null: ("'" + tenantId + "'")),
+                (schema==null ? null : ("'" + schema + "'")), tableName, 
newTableName,
+                (columnValues.size() > 0? "," +String.join(",", 
columnValues):""));
+
+        LOGGER.info("About to do cutover via " + changeTable);
+        TableViewFinderResult childViewsResult = 
ViewUtil.findChildViews(connection, tenantId, schema, tableName);
+        boolean wasCommit = connection.getAutoCommit();
+        connection.setAutoCommit(false);
+        List<TableInfo> viewsToUpdateCache = new ArrayList<>();
+        try {
+            connection.createStatement().execute(changeTable);
+
+            // Update column qualifiers
+            PTable pNewTable = PhoenixRuntime.getTable(connection, 
systemTransformRecord.getNewPhysicalTableName());
+            PTable pOldTable = PhoenixRuntime.getTable(connection, 
SchemaUtil.getTableName(schema, tableName));
+            if (pOldTable.getImmutableStorageScheme() != 
pNewTable.getImmutableStorageScheme() ||
+                    pOldTable.getEncodingScheme() != 
pNewTable.getEncodingScheme()) {
+                MetaDataClient.mutateTransformProperties(connection, tenantId, 
schema, tableName, newTableName,
+                        pNewTable.getImmutableStorageScheme(), 
pNewTable.getEncodingScheme());
+                // We need to update the columns's qualifiers as well
+                mutateColumns(connection.unwrap(PhoenixConnection.class), 
pOldTable, pNewTable);
+
+                // Also update view column qualifiers
+                for (TableInfo view : childViewsResult.getLinks()) {
+                    PTable pView = PhoenixRuntime.getTable(connection, 
view.getTenantId()==null? null: Bytes.toString(view.getTenantId())
+                            , SchemaUtil.getTableName(view.getSchemaName(), 
view.getTableName()));
+                    
mutateViewColumns(connection.unwrap(PhoenixConnection.class), pView, pNewTable);
+                }
+            }
+            connection.commit();
+
+            // We can have millions of views. We need to send it in batches
+            int maxBatchSize = 
connection.getQueryServices().getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB,
 QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            int batchSize = 0;
+            for (TableInfo view : childViewsResult.getLinks()) {
+                String changeView = String.format(changeViewStmt,
+                        (columnNames.size() > 0? "," +String.join(",", 
columnNames):""),
+                        (view.getTenantId()==null || view.getTenantId().length 
== 0? null: ("'" + Bytes.toString(view.getTenantId()) + "'")),
+                        (view.getSchemaName()==null || 
view.getSchemaName().length == 0? null : ("'" + 
Bytes.toString(view.getSchemaName()) + "'")),
+                        Bytes.toString(view.getTableName()),
+                        (columnValues.size() > 0? "," +String.join(",", 
columnValues):""));
+                LOGGER.info("Cutover changing view via " + changeView);
+                connection.createStatement().execute(changeView);
+                viewsToUpdateCache.add(view);
+                batchSize++;
+                if (batchSize >= maxBatchSize) {
+                    connection.commit();
+                    batchSize = 0;
+                }
+            }
+            if (batchSize > 0) {
+                connection.commit();
+                batchSize = 0;
+            }
+
+            
connection.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+            
UpgradeUtil.clearCacheAndGetNewTable(connection.unwrap(PhoenixConnection.class),
+                    
connection.getTenantId()==null?null:connection.getTenantId().getString(),
+                    schema, tableName, 
systemTransformRecord.getLogicalParentName(), MIN_TABLE_TIMESTAMP);
+            for (TableInfo view : viewsToUpdateCache) {
+                
UpgradeUtil.clearCacheAndGetNewTable(connection.unwrap(PhoenixConnection.class),

Review comment:
       Good idea. Let me just expire them without querying.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to