This is an automated email from the ASF dual-hosted git repository.

gokcen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 9de2d60  PHOENIX-6639 Read repair for transforming table
9de2d60 is described below

commit 9de2d60a8211464600ac7688b5d7be4c2f8cc3dd
Author: Gokcen Iskender <gisken...@salesforce.com>
AuthorDate: Tue Jan 4 15:58:37 2022 -0800

    PHOENIX-6639 Read repair for transforming table
    
    Signed-off-by: Gokcen Iskender <gokc...@gmail.com>
---
 .../phoenix/end2end/transform/TransformToolIT.java | 110 ++++++++++++++++++
 .../coprocessor/BaseScannerRegionObserver.java     |   1 +
 .../phoenix/coprocessor/ScanRegionObserver.java    |  16 ++-
 .../UngroupedAggregateRegionObserver.java          |   1 +
 .../apache/phoenix/index/GlobalIndexChecker.java   |  19 ++-
 .../phoenix/iterate/TableResultIterator.java       |   1 +
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |   2 +-
 .../org/apache/phoenix/query/QueryConstants.java   |   4 +-
 .../schema/transform/SystemTransformRecord.java    |  12 +-
 .../apache/phoenix/schema/transform/Transform.java |   6 +-
 .../schema/transform/TransformMaintainer.java      |  26 ++++-
 .../java/org/apache/phoenix/util/ScanUtil.java     | 127 ++++++++++++++-------
 12 files changed, 268 insertions(+), 57 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
index b8b39c7..e0702f8 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java
@@ -450,6 +450,116 @@ public class TransformToolIT extends 
ParallelStatsDisabledIT {
     }
 
     @Test
+    public void testTransformMutationReadRepair() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 0;
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows, 
tableDDLOptions);
+            SingleCellIndexIT.assertMetadata(conn, 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+            conn.createStatement().execute("ALTER TABLE " + dataTableFullName +
+                    " SET 
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, 
COLUMN_ENCODED_BYTES=2");
+            SystemTransformRecord record = 
Transform.getTransformRecord(schemaName, dataTableName, null, null, 
conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, 
record.getNewPhysicalTableName());
+
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, 
?)", dataTableFullName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            IndexToolIT.upsertRow(stmt1, 1);
+
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            IndexToolIT.upsertRow(stmt1, 2);
+
+            assertEquals(1, getRowCountForEmptyColValue(conn, 
record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+            assertEquals(1, getRowCountForEmptyColValue(conn, 
record.getNewPhysicalTableName(), VERIFIED_BYTES));
+
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+
+            Transform.doCutover(conn.unwrap(PhoenixConnection.class), record);
+            
Transform.updateTransformRecord(conn.unwrap(PhoenixConnection.class), record, 
PTable.TransformStatus.COMPLETED);
+
+            assertEquals(1, getRowCountForEmptyColValue(conn, 
record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+            assertEquals(1, getRowCountForEmptyColValue(conn, 
record.getNewPhysicalTableName(), VERIFIED_BYTES));
+
+            // Now do read repair
+            String select = "SELECT * FROM " + dataTableFullName;
+            ResultSet rs = conn.createStatement().executeQuery(select);
+            assertTrue(rs.next());
+            assertTrue(rs.next());
+            assertFalse(rs.next());
+
+            assertEquals(0, getRowCountForEmptyColValue(conn, 
record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+            assertEquals(2, getRowCountForEmptyColValue(conn, 
record.getNewPhysicalTableName(), VERIFIED_BYTES));
+        } finally {
+            IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+        }
+    }
+
+    @Test
+    public void testTransformIndexReadRepair() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+        String indexTableName = "IDX_" + generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, 
indexTableName);
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            int numOfRows = 0;
+            createTableAndUpsertRows(conn, dataTableFullName, numOfRows, 
tableDDLOptions);
+            SingleCellIndexIT.assertMetadata(conn, 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName);
+
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + 
" ON " + dataTableFullName + " (NAME) INCLUDE (ZIP)");
+            SingleCellIndexIT.assertMetadata(conn, 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexTableFullName);
+            conn.createStatement().execute("ALTER INDEX " + indexTableName + " 
ON " + dataTableFullName +
+                    " ACTIVE 
IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, 
COLUMN_ENCODED_BYTES=2");
+
+            SystemTransformRecord record = 
Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, 
null, conn.unwrap(PhoenixConnection.class));
+            assertNotNull(record);
+            assertMetadata(conn, 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, 
PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, 
record.getNewPhysicalTableName());
+
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, 
?)", dataTableFullName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            IndexToolIT.upsertRow(stmt1, 1);
+
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            IndexToolIT.upsertRow(stmt1, 2);
+
+            assertEquals(1, getRowCountForEmptyColValue(conn, 
record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+            assertEquals(1, getRowCountForEmptyColValue(conn, 
record.getNewPhysicalTableName(), VERIFIED_BYTES));
+
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+
+            Transform.doCutover(conn.unwrap(PhoenixConnection.class), record);
+
+            assertEquals(1, getRowCountForEmptyColValue(conn, 
record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+            assertEquals(1, getRowCountForEmptyColValue(conn, 
record.getNewPhysicalTableName(), VERIFIED_BYTES));
+
+            // Now do read repair
+            String select = "SELECT NAME, ZIP FROM " + dataTableFullName;
+            ResultSet rs = conn.createStatement().executeQuery(select);
+            assertTrue(rs.next());
+            assertTrue(rs.next());
+            assertFalse(rs.next());
+
+            assertEquals(0, getRowCountForEmptyColValue(conn, 
record.getNewPhysicalTableName(), UNVERIFIED_BYTES));
+            assertEquals(2, getRowCountForEmptyColValue(conn, 
record.getNewPhysicalTableName(), VERIFIED_BYTES));
+        } finally {
+            IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+        }
+    }
+
+    @Test
     public void testTransformMutationFailureRepair() throws Exception {
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index cff74ec..8532b68 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -127,6 +127,7 @@ abstract public class BaseScannerRegionObserver extends 
CompatBaseScannerRegionO
     public static final String EMPTY_COLUMN_FAMILY_NAME = "_EmptyCFName";
     public static final String EMPTY_COLUMN_QUALIFIER_NAME = "_EmptyCQName";
     public static final String INDEX_ROW_KEY = "_IndexRowKey";
+    public static final String READ_REPAIR_TRANSFORMING_TABLE = 
"_ReadRepairTransformingTable";
     
     public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = 
PUnsignedTinyint.INSTANCE.toBytes(1);
     public final static byte[] REPLAY_ONLY_INDEX_WRITES = 
PUnsignedTinyint.INSTANCE.toBytes(2);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index d985395..ee04f36 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -25,6 +25,7 @@ import java.util.NavigableMap;
 import java.util.Optional;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
@@ -39,6 +40,9 @@ import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.coprocessor.generated.DynamicColumnMetaDataProtos;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
+import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
@@ -69,9 +73,14 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver implements Reg
     public static final String WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS =
             "_WildcardScanIncludesDynCols";
 
+    private static boolean readRepairTransformingTable = false;
+    private static  GlobalIndexChecker.GlobalIndexScanner globalIndexScanner;
+    private static GlobalIndexChecker globalIndexChecker = new 
GlobalIndexChecker();
+    private static GlobalIndexCheckerSource metricsSource = 
MetricsIndexerSourceFactory.getInstance().getGlobalIndexCheckerSource();
+
     @Override
     public Optional<RegionObserver> getRegionObserver() {
-      return Optional.of(this);
+        return Optional.of(this);
     }
 
     public static void serializeIntoScan(Scan scan, int limit,
@@ -191,6 +200,11 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver implements Reg
     @Override
     protected RegionScanner doPostScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final 
RegionScanner s) throws Throwable {
         NonAggregateRegionScannerFactory nonAggregateROUtil = new 
NonAggregateRegionScannerFactory(c.getEnvironment());
+        if 
(scan.getAttribute(BaseScannerRegionObserver.READ_REPAIR_TRANSFORMING_TABLE) != 
null) {
+            readRepairTransformingTable = true;
+            globalIndexScanner = globalIndexChecker.new 
GlobalIndexScanner(c.getEnvironment(), scan, s, metricsSource);
+            return nonAggregateROUtil.getRegionScanner(scan, 
globalIndexScanner);
+        }
         return nonAggregateROUtil.getRegionScanner(scan, s);
     }
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index e9a394d..1c7b1a1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -413,6 +413,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                 }
             });
         }
+
         boolean useNewValueColumnQualifier = 
EncodedColumnsUtil.useNewValueColumnQualifier(scan);
         int offsetToBe = 0;
         if (localIndexScan) {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index d9185cd..a220289 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.index;
 
 import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer;
+import static org.apache.phoenix.query.QueryConstants.UNVERIFIED_BYTES;
 import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 import static org.apache.phoenix.util.ScanUtil.getDummyResult;
@@ -64,6 +65,7 @@ import 
org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
@@ -123,7 +125,7 @@ public class GlobalIndexChecker extends 
BaseScannerRegionObserver implements Reg
      * An instance of this class is created for each scanner on an index
      * and used to verify individual rows and rebuild them if they are not 
valid
      */
-    private class GlobalIndexScanner extends BaseRegionScanner {
+    public class GlobalIndexScanner extends BaseRegionScanner {
         private RegionScanner scanner;
         private long ageThreshold;
         private Scan scan;
@@ -548,9 +550,18 @@ public class GlobalIndexChecker extends 
BaseScannerRegionObserver implements Reg
             while (cellIterator.hasNext()) {
                 cell = cellIterator.next();
                 if (isEmptyColumn(cell)) {
-                    if (Bytes.compareTo(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength(),
-                            VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0) {
-                        return false;
+                    if (indexMaintainer instanceof TransformMaintainer) {
+                        // This is a transforming table. After cutoff, if 
there are new mutations on the table,
+                        // their empty col value would be x. So, we are only 
interested in unverified ones.
+                        if (Bytes.compareTo(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength(),
+                                UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) 
== 0) {
+                            return false;
+                        }
+                    } else {
+                        if (Bytes.compareTo(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength(),
+                                VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 
0) {
+                            return false;
+                        }
                     }
                     // Empty column is not supposed to be returned to the 
client except it is the only column included
                     // in the scan
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 5c8d6c7..9342ddc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -26,6 +26,7 @@ import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NO
 import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
 import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
 import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import java.io.IOException;
 import java.sql.SQLException;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 28e819b..eba2f1a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -225,7 +225,7 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final String OLD_METADATA = "OLD_METADATA";
     public static final String NEW_METADATA = "NEW_METADATA";
     public static final String TRANSFORM_FUNCTION = "TRANSFORM_FUNCTION";
-    public static final String TRANSFORM_TABLE_TTL = "864000";
+    public static final String TRANSFORM_TABLE_TTL = "7776000"; // 90 days
 
     public static final int TTL_FOR_MUTEX = 15 * 60; // 15min
     public static final String ARRAY_SIZE = "ARRAY_SIZE";
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index cf203da..f465412 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -571,14 +571,14 @@ public interface QueryConstants {
             TRANSFORM_RETRY_COUNT + " INTEGER NULL," +
             TRANSFORM_START_TS + " TIMESTAMP NULL," +
             TRANSFORM_LAST_STATE_TS + " TIMESTAMP NULL," +
-            OLD_METADATA + " VARCHAR NULL,\n" +
+            OLD_METADATA + " VARBINARY NULL,\n" +
             NEW_METADATA + " VARCHAR NULL,\n" +
             TRANSFORM_FUNCTION + " VARCHAR NULL\n" +
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
             TENANT_ID + "," + TABLE_SCHEM + "," + LOGICAL_TABLE_NAME + "))\n" +
             HConstants.VERSIONS + "=%s,\n" +
             ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
-            ColumnFamilyDescriptorBuilder.TTL + "=" + TRANSFORM_TABLE_TTL + 
",\n" +     // 10 days
+            ColumnFamilyDescriptorBuilder.TTL + "=" + TRANSFORM_TABLE_TTL + 
",\n" +     // 90 days
             TableDescriptorBuilder.SPLIT_POLICY + "='"
             + SystemTaskSplitPolicy.class.getName() + "',\n" +
             TRANSACTIONAL + "=" + Boolean.FALSE + ",\n" +
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
index 63378bb..d6fec5a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java
@@ -45,14 +45,14 @@ public class SystemTransformRecord {
     private final Integer transformRetryCount;
     private final Timestamp startTs;
     private final Timestamp lastStateTs;
-    private final String oldMetadata;
+    private final byte[] oldMetadata;
     private final String newMetadata;
     private final String transformFunction;
 
     public SystemTransformRecord(PTable.TransformType transformType,
                                  String schemaName, String logicalTableName, 
String tenantId, String newPhysicalTableName, String logicalParentName,
                                  String transformStatus, String 
transformJobId, Integer transformRetryCount, Timestamp startTs,
-                                 Timestamp lastStateTs, String oldMetadata, 
String newMetadata, String transformFunction) {
+                                 Timestamp lastStateTs, byte[] oldMetadata, 
String newMetadata, String transformFunction) {
         this.transformType = transformType;
         this.schemaName = schemaName;
         this.tenantId = tenantId;
@@ -119,7 +119,7 @@ public class SystemTransformRecord {
         return lastStateTs;
     }
 
-    public String getOldMetadata() {
+    public byte[] getOldMetadata() {
         return oldMetadata;
     }
     public String getNewMetadata() {
@@ -149,7 +149,7 @@ public class SystemTransformRecord {
         private int transformRetryCount =0;
         private Timestamp startTs = new 
Timestamp(EnvironmentEdgeManager.currentTimeMillis());
         private Timestamp lastStateTs;
-        private String oldMetadata;
+        private byte[] oldMetadata;
         private String newMetadata;
         private String transformFunction;
 
@@ -214,7 +214,7 @@ public class SystemTransformRecord {
             return this;
         }
 
-        public SystemTransformBuilder setOldMetadata(String oldMetadata) {
+        public SystemTransformBuilder setOldMetadata(byte[] oldMetadata) {
             this.oldMetadata = oldMetadata;
             return this;
         }
@@ -268,7 +268,7 @@ public class SystemTransformRecord {
             builder.setTransformRetryCount(resultSet.getInt(col++));
             builder.setStartTs(resultSet.getTimestamp(col++));
             builder.setLastStateTs(resultSet.getTimestamp(col++));
-            builder.setOldMetadata(resultSet.getString(col++));
+            builder.setOldMetadata(resultSet.getBytes(col++));
             builder.setNewMetadata(resultSet.getString(col++));
             builder.setTransformFunction(resultSet.getString(col++));
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
index 8aecf1d..7276c65 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
@@ -109,7 +109,7 @@ public class Transform {
                                     long sequenceNum, PTable.TransformType 
transformType) throws SQLException {
         try {
             String newMetadata = 
JacksonUtil.getObjectWriter().writeValueAsString(changingProperties);
-            String oldMetadata = "";
+            byte[] oldMetadata = PTableImpl.toProto(table).toByteArray();
             String newPhysicalTableName = "";
             SystemTransformRecord.SystemTransformBuilder transformBuilder = 
new SystemTransformRecord.SystemTransformBuilder();
             String schema = table.getSchemaName()!=null ? 
table.getSchemaName().getString() : null;
@@ -401,9 +401,9 @@ public class Transform {
                 stmt.setNull(colNum++, Types.TIMESTAMP);
             }
             if (systemTransformParams.getOldMetadata() != null) {
-                stmt.setString(colNum++, 
systemTransformParams.getOldMetadata());
+                stmt.setBytes(colNum++, 
systemTransformParams.getOldMetadata());
             } else {
-                stmt.setNull(colNum++, Types.VARCHAR);
+                stmt.setNull(colNum++, Types.VARBINARY);
             }
             if (systemTransformParams.getNewMetadata() != null) {
                 stmt.setString(colNum++, 
systemTransformParams.getNewMetadata());
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
index da445f6..df33c41 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java
@@ -64,6 +64,7 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -76,7 +77,7 @@ import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCOD
 
 public class TransformMaintainer extends IndexMaintainer {
     private boolean isMultiTenant;
-    // indexed expressions that are not present in the row key of the data 
table, the expression can also refer to a regular column
+    // expressions that are not present in the row key of the old table, the 
expression can also refer to a regular column
     private List<Expression> newTableExpressions;
     private Set<ColumnReference> newTableColumns;
 
@@ -196,6 +197,19 @@ public class TransformMaintainer extends IndexMaintainer {
         }
     }
 
+    public Set<ColumnReference> getAllColumnsForDataTable() {
+        Set<ColumnReference> result = 
Sets.newLinkedHashSetWithExpectedSize(newTableExpressions.size() + 
coveredColumnsMap.size());
+        result.addAll(newTableColumns);
+        for (ColumnReference colRef : coveredColumnsMap.keySet()) {
+            if (oldTableImmutableStorageScheme == 
PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+                result.add(colRef);
+            } else {
+                result.add(new ColumnReference(colRef.getFamily(), 
QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES));
+            }
+        }
+        return result;
+    }
+
     /*
      * Build the old table row key
      */
@@ -376,6 +390,16 @@ public class TransformMaintainer extends IndexMaintainer {
         for (ServerCachingProtos.ColumnInfo info : newTblColumnInfoList) {
             maintainer.newTableColumnsInfo.add(new 
Pair<>(info.getFamilyName(), info.getColumnName()));
         }
+        maintainer.newTableExpressions = new ArrayList<>();
+        try (ByteArrayInputStream stream = new 
ByteArrayInputStream(proto.getNewTableExpressions().toByteArray())) {
+            DataInput input = new DataInputStream(stream);
+            while (stream.available() > 0) {
+                int expressionOrdinal = WritableUtils.readVInt(input);
+                Expression expression = 
ExpressionType.values()[expressionOrdinal].newInstance();
+                expression.readFields(input);
+                maintainer.newTableExpressions.add(expression);
+            }
+        }
         // proto doesn't support single byte so need an explicit cast here
         maintainer.newTableEncodingScheme = 
PTable.QualifierEncodingScheme.fromSerializedValue((byte) 
proto.getNewTableEncodingScheme());
         maintainer.newTableImmutableStorageScheme = 
PTable.ImmutableStorageScheme.fromSerializedValue((byte) 
proto.getNewTableImmutableStorageScheme());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 878d89e..2d57d8f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -58,6 +58,7 @@ import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.BaseQueryPlan;
@@ -85,11 +86,16 @@ import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.tool.SchemaExtractionProcessor;
+import org.apache.phoenix.schema.transform.SystemTransformRecord;
+import org.apache.phoenix.schema.transform.Transform;
+import org.apache.phoenix.schema.transform.TransformMaintainer;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
@@ -1129,46 +1135,89 @@ public class ScanUtil {
     }
 
     public static void setScanAttributesForIndexReadRepair(Scan scan, PTable 
table, PhoenixConnection phoenixConnection) throws SQLException {
-        if (table.isTransactional() || table.getType() != PTableType.INDEX) {
-            return;
-        }
+        boolean isTransforming = (table.getTransformingNewTable() != null);
         PTable indexTable = table;
-        if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) {
-            return;
-        }
-        String schemaName = indexTable.getParentSchemaName().getString();
-        String tableName = indexTable.getParentTableName().getString();
-        PTable dataTable;
-        try {
-            dataTable = PhoenixRuntime.getTable(phoenixConnection, 
SchemaUtil.getTableName(schemaName, tableName));
-        } catch (TableNotFoundException e) {
-            // This index table must be being deleted. No need to set the scan 
attributes
-            return;
-        }
-        // MetaDataClient modifies the index table name for view indexes if 
the parent view of an index has a child
-        // view. This, we need to recreate a PTable object with the correct 
table name for the rest of this code to work
-        if (indexTable.getViewIndexId() != null && 
indexTable.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR))
 {
-            int lastIndexOf = 
indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
-            String indexName = 
indexTable.getName().getString().substring(lastIndexOf + 1);
-            indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName);
-        }
-        if (!dataTable.getIndexes().contains(indexTable)) {
-            return;
-        }
-        if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) {
-            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-            IndexMaintainer.serialize(dataTable, ptr, 
Collections.singletonList(indexTable), phoenixConnection);
-            scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
-        }
-        scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, 
TRUE_BYTES);
-        scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, 
dataTable.getPhysicalName().getBytes());
-        IndexMaintainer indexMaintainer = 
indexTable.getIndexMaintainer(dataTable, phoenixConnection);
-        byte[] emptyCF = 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
-        byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
-        scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, 
emptyCF);
-        
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, 
emptyCQ);
-        if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == 
null) {
-            BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
+        // Transforming index table can be repaired in regular path via 
globalindexchecker coproc on it.
+        // phoenixConnection is closed when it is called from mappers
+        if (!phoenixConnection.isClosed() && table.getType() == 
PTableType.TABLE && isTransforming) {
+            SystemTransformRecord systemTransformRecord = 
Transform.getTransformRecord(indexTable.getSchemaName(), 
indexTable.getTableName(),
+                    null, phoenixConnection.getTenantId(), phoenixConnection);
+            if (systemTransformRecord == null) {
+                return;
+            }
+            // Old table is still active, cutover didn't happen yet, so, no 
need to read repair
+            if 
(!systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.COMPLETED.name()))
 {
+                return;
+            }
+            byte[] oldTableBytes = systemTransformRecord.getOldMetadata();
+            if (oldTableBytes == null || oldTableBytes.length == 0) {
+                return;
+            }
+            PTable oldTable = null;
+            try {
+                oldTable = 
PTableImpl.createFromProto(PTableProtos.PTable.parseFrom(oldTableBytes));
+            } catch (IOException e) {
+                LOGGER.error("Cannot parse old table info for read repair for 
table " + table.getName());
+                return;
+            }
+            TransformMaintainer indexMaintainer = 
indexTable.getTransformMaintainer(oldTable, phoenixConnection);
+            // This is the path where we are reading from the newly 
transformed table
+            if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) {
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                TransformMaintainer.serialize(oldTable, ptr, indexTable, 
phoenixConnection);
+                scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
+            }
+            scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, 
TRUE_BYTES);
+            
scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, 
oldTable.getPhysicalName().getBytes());
+            byte[] emptyCF = 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+            
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF);
+            
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, 
emptyCQ);
+            
scan.setAttribute(BaseScannerRegionObserver.READ_REPAIR_TRANSFORMING_TABLE, 
TRUE_BYTES);
+        } else {
+            if (table.isTransactional() || table.getType() != 
PTableType.INDEX) {
+                return;
+            }
+            if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) {
+                return;
+            }
+
+            String schemaName = indexTable.getParentSchemaName().getString();
+            String tableName = indexTable.getParentTableName().getString();
+            PTable dataTable;
+            try {
+                dataTable = PhoenixRuntime.getTable(phoenixConnection, 
SchemaUtil.getTableName(schemaName, tableName));
+            } catch (TableNotFoundException e) {
+                // This index table must be being deleted. No need to set the 
scan attributes
+                return;
+            }
+            // MetaDataClient modifies the index table name for view indexes 
if the parent view of an index has a child
+            // view. This, we need to recreate a PTable object with the 
correct table name for the rest of this code to work
+            if (indexTable.getViewIndexId() != null && 
indexTable.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR))
 {
+                int lastIndexOf = 
indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR);
+                String indexName = 
indexTable.getName().getString().substring(lastIndexOf + 1);
+                indexTable = PhoenixRuntime.getTable(phoenixConnection, 
indexName);
+            }
+            if (!dataTable.getIndexes().contains(indexTable)) {
+                return;
+            }
+
+            if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) {
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                IndexMaintainer.serialize(dataTable, ptr, 
Collections.singletonList(indexTable), phoenixConnection);
+                scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
+            }
+            scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, 
TRUE_BYTES);
+            
scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, 
dataTable.getPhysicalName().getBytes());
+            IndexMaintainer indexMaintainer = 
indexTable.getIndexMaintainer(dataTable, phoenixConnection);
+            byte[] emptyCF = 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+            
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF);
+            
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, 
emptyCQ);
+            if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == 
null) {
+                BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
+            }
+            addEmptyColumnToScan(scan, emptyCF, emptyCQ);
         }
     }
 

Reply via email to