This is an automated email from the ASF dual-hosted git repository. gokcen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 9de2d60 PHOENIX-6639 Read repair for transforming table 9de2d60 is described below commit 9de2d60a8211464600ac7688b5d7be4c2f8cc3dd Author: Gokcen Iskender <gisken...@salesforce.com> AuthorDate: Tue Jan 4 15:58:37 2022 -0800 PHOENIX-6639 Read repair for transforming table Signed-off-by: Gokcen Iskender <gokc...@gmail.com> --- .../phoenix/end2end/transform/TransformToolIT.java | 110 ++++++++++++++++++ .../coprocessor/BaseScannerRegionObserver.java | 1 + .../phoenix/coprocessor/ScanRegionObserver.java | 16 ++- .../UngroupedAggregateRegionObserver.java | 1 + .../apache/phoenix/index/GlobalIndexChecker.java | 19 ++- .../phoenix/iterate/TableResultIterator.java | 1 + .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 2 +- .../org/apache/phoenix/query/QueryConstants.java | 4 +- .../schema/transform/SystemTransformRecord.java | 12 +- .../apache/phoenix/schema/transform/Transform.java | 6 +- .../schema/transform/TransformMaintainer.java | 26 ++++- .../java/org/apache/phoenix/util/ScanUtil.java | 127 ++++++++++++++------- 12 files changed, 268 insertions(+), 57 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java index b8b39c7..e0702f8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/transform/TransformToolIT.java @@ -450,6 +450,116 @@ public class TransformToolIT extends ParallelStatsDisabledIT { } @Test + public void testTransformMutationReadRepair() throws Exception { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(true); + int numOfRows = 0; + createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions); + SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName); + + conn.createStatement().execute("ALTER TABLE " + dataTableFullName + + " SET IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2"); + SystemTransformRecord record = Transform.getTransformRecord(schemaName, dataTableName, null, null, conn.unwrap(PhoenixConnection.class)); + assertNotNull(record); + assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName()); + + String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName); + PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); + IndexToolIT.upsertRow(stmt1, 1); + + IndexRegionObserver.setFailPostIndexUpdatesForTesting(true); + IndexToolIT.upsertRow(stmt1, 2); + + assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES)); + assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES)); + + IndexRegionObserver.setFailPostIndexUpdatesForTesting(false); + + Transform.doCutover(conn.unwrap(PhoenixConnection.class), record); + Transform.updateTransformRecord(conn.unwrap(PhoenixConnection.class), record, PTable.TransformStatus.COMPLETED); + + assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES)); + assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES)); + + // Now do read repair + String select = "SELECT * FROM " + dataTableFullName; + ResultSet rs = conn.createStatement().executeQuery(select); + assertTrue(rs.next()); + assertTrue(rs.next()); + assertFalse(rs.next()); + + assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES)); + assertEquals(2, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES)); + } finally { + IndexRegionObserver.setFailPreIndexUpdatesForTesting(false); + IndexRegionObserver.setFailDataTableUpdatesForTesting(false); + IndexRegionObserver.setFailPostIndexUpdatesForTesting(false); + } + } + + @Test + public void testTransformIndexReadRepair() throws Exception { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = "IDX_" + generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(true); + int numOfRows = 0; + createTableAndUpsertRows(conn, dataTableFullName, numOfRows, tableDDLOptions); + SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, dataTableFullName); + + conn.createStatement().execute("CREATE INDEX " + indexTableName + " ON " + dataTableFullName + " (NAME) INCLUDE (ZIP)"); + SingleCellIndexIT.assertMetadata(conn, PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN, PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, indexTableFullName); + conn.createStatement().execute("ALTER INDEX " + indexTableName + " ON " + dataTableFullName + + " ACTIVE IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2"); + + SystemTransformRecord record = Transform.getTransformRecord(schemaName, indexTableName, dataTableFullName, null, conn.unwrap(PhoenixConnection.class)); + assertNotNull(record); + assertMetadata(conn, PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS, PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS, record.getNewPhysicalTableName()); + + String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName); + PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); + IndexToolIT.upsertRow(stmt1, 1); + + IndexRegionObserver.setFailPostIndexUpdatesForTesting(true); + IndexToolIT.upsertRow(stmt1, 2); + + assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES)); + assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES)); + + IndexRegionObserver.setFailPostIndexUpdatesForTesting(false); + + Transform.doCutover(conn.unwrap(PhoenixConnection.class), record); + + assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES)); + assertEquals(1, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES)); + + // Now do read repair + String select = "SELECT NAME, ZIP FROM " + dataTableFullName; + ResultSet rs = conn.createStatement().executeQuery(select); + assertTrue(rs.next()); + assertTrue(rs.next()); + assertFalse(rs.next()); + + assertEquals(0, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), UNVERIFIED_BYTES)); + assertEquals(2, getRowCountForEmptyColValue(conn, record.getNewPhysicalTableName(), VERIFIED_BYTES)); + } finally { + IndexRegionObserver.setFailPreIndexUpdatesForTesting(false); + IndexRegionObserver.setFailDataTableUpdatesForTesting(false); + IndexRegionObserver.setFailPostIndexUpdatesForTesting(false); + } + } + + @Test public void testTransformMutationFailureRepair() throws Exception { String schemaName = generateUniqueName(); String dataTableName = generateUniqueName(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index cff74ec..8532b68 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -127,6 +127,7 @@ abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionO public static final String EMPTY_COLUMN_FAMILY_NAME = "_EmptyCFName"; public static final String EMPTY_COLUMN_QUALIFIER_NAME = "_EmptyCQName"; public static final String INDEX_ROW_KEY = "_IndexRowKey"; + public static final String READ_REPAIR_TRANSFORMING_TABLE = "_ReadRepairTransformingTable"; public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1); public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index d985395..ee04f36 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -25,6 +25,7 @@ import java.util.NavigableMap; import java.util.Optional; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; @@ -39,6 +40,9 @@ import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.coprocessor.generated.DynamicColumnMetaDataProtos; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.expression.OrderByExpression; +import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource; +import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; +import org.apache.phoenix.index.GlobalIndexChecker; import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; @@ -69,9 +73,14 @@ public class ScanRegionObserver extends BaseScannerRegionObserver implements Reg public static final String WILDCARD_SCAN_INCLUDES_DYNAMIC_COLUMNS = "_WildcardScanIncludesDynCols"; + private static boolean readRepairTransformingTable = false; + private static GlobalIndexChecker.GlobalIndexScanner globalIndexScanner; + private static GlobalIndexChecker globalIndexChecker = new GlobalIndexChecker(); + private static GlobalIndexCheckerSource metricsSource = MetricsIndexerSourceFactory.getInstance().getGlobalIndexCheckerSource(); + @Override public Optional<RegionObserver> getRegionObserver() { - return Optional.of(this); + return Optional.of(this); } public static void serializeIntoScan(Scan scan, int limit, @@ -191,6 +200,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver implements Reg @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable { NonAggregateRegionScannerFactory nonAggregateROUtil = new NonAggregateRegionScannerFactory(c.getEnvironment()); + if (scan.getAttribute(BaseScannerRegionObserver.READ_REPAIR_TRANSFORMING_TABLE) != null) { + readRepairTransformingTable = true; + globalIndexScanner = globalIndexChecker.new GlobalIndexScanner(c.getEnvironment(), scan, s, metricsSource); + return nonAggregateROUtil.getRegionScanner(scan, globalIndexScanner); + } return nonAggregateROUtil.getRegionScanner(scan, s); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index e9a394d..1c7b1a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -413,6 +413,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } }); } + boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); int offsetToBe = 0; if (localIndexScan) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java index d9185cd..a220289 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java @@ -18,6 +18,7 @@ package org.apache.phoenix.index; import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer; +import static org.apache.phoenix.query.QueryConstants.UNVERIFIED_BYTES; import static org.apache.phoenix.query.QueryConstants.VERIFIED_BYTES; import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import static org.apache.phoenix.util.ScanUtil.getDummyResult; @@ -64,6 +65,7 @@ import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.transform.TransformMaintainer; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexUtil; @@ -123,7 +125,7 @@ public class GlobalIndexChecker extends BaseScannerRegionObserver implements Reg * An instance of this class is created for each scanner on an index * and used to verify individual rows and rebuild them if they are not valid */ - private class GlobalIndexScanner extends BaseRegionScanner { + public class GlobalIndexScanner extends BaseRegionScanner { private RegionScanner scanner; private long ageThreshold; private Scan scan; @@ -548,9 +550,18 @@ public class GlobalIndexChecker extends BaseScannerRegionObserver implements Reg while (cellIterator.hasNext()) { cell = cellIterator.next(); if (isEmptyColumn(cell)) { - if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), - VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0) { - return false; + if (indexMaintainer instanceof TransformMaintainer) { + // This is a transforming table. After cutoff, if there are new mutations on the table, + // their empty col value would be x. So, we are only interested in unverified ones. + if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), + UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) == 0) { + return false; + } + } else { + if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), + VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0) { + return false; + } } // Empty column is not supposed to be returned to the client except it is the only column included // in the scan diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 5c8d6c7..9342ddc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -26,6 +26,7 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NO import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED; +import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import java.io.IOException; import java.sql.SQLException; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 28e819b..eba2f1a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -225,7 +225,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final String OLD_METADATA = "OLD_METADATA"; public static final String NEW_METADATA = "NEW_METADATA"; public static final String TRANSFORM_FUNCTION = "TRANSFORM_FUNCTION"; - public static final String TRANSFORM_TABLE_TTL = "864000"; + public static final String TRANSFORM_TABLE_TTL = "7776000"; // 90 days public static final int TTL_FOR_MUTEX = 15 * 60; // 15min public static final String ARRAY_SIZE = "ARRAY_SIZE"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index cf203da..f465412 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -571,14 +571,14 @@ public interface QueryConstants { TRANSFORM_RETRY_COUNT + " INTEGER NULL," + TRANSFORM_START_TS + " TIMESTAMP NULL," + TRANSFORM_LAST_STATE_TS + " TIMESTAMP NULL," + - OLD_METADATA + " VARCHAR NULL,\n" + + OLD_METADATA + " VARBINARY NULL,\n" + NEW_METADATA + " VARCHAR NULL,\n" + TRANSFORM_FUNCTION + " VARCHAR NULL\n" + "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + TABLE_SCHEM + "," + LOGICAL_TABLE_NAME + "))\n" + HConstants.VERSIONS + "=%s,\n" + ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" + - ColumnFamilyDescriptorBuilder.TTL + "=" + TRANSFORM_TABLE_TTL + ",\n" + // 10 days + ColumnFamilyDescriptorBuilder.TTL + "=" + TRANSFORM_TABLE_TTL + ",\n" + // 90 days TableDescriptorBuilder.SPLIT_POLICY + "='" + SystemTaskSplitPolicy.class.getName() + "',\n" + TRANSACTIONAL + "=" + Boolean.FALSE + ",\n" + diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java index 63378bb..d6fec5a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/SystemTransformRecord.java @@ -45,14 +45,14 @@ public class SystemTransformRecord { private final Integer transformRetryCount; private final Timestamp startTs; private final Timestamp lastStateTs; - private final String oldMetadata; + private final byte[] oldMetadata; private final String newMetadata; private final String transformFunction; public SystemTransformRecord(PTable.TransformType transformType, String schemaName, String logicalTableName, String tenantId, String newPhysicalTableName, String logicalParentName, String transformStatus, String transformJobId, Integer transformRetryCount, Timestamp startTs, - Timestamp lastStateTs, String oldMetadata, String newMetadata, String transformFunction) { + Timestamp lastStateTs, byte[] oldMetadata, String newMetadata, String transformFunction) { this.transformType = transformType; this.schemaName = schemaName; this.tenantId = tenantId; @@ -119,7 +119,7 @@ public class SystemTransformRecord { return lastStateTs; } - public String getOldMetadata() { + public byte[] getOldMetadata() { return oldMetadata; } public String getNewMetadata() { @@ -149,7 +149,7 @@ public class SystemTransformRecord { private int transformRetryCount =0; private Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis()); private Timestamp lastStateTs; - private String oldMetadata; + private byte[] oldMetadata; private String newMetadata; private String transformFunction; @@ -214,7 +214,7 @@ public class SystemTransformRecord { return this; } - public SystemTransformBuilder setOldMetadata(String oldMetadata) { + public SystemTransformBuilder setOldMetadata(byte[] oldMetadata) { this.oldMetadata = oldMetadata; return this; } @@ -268,7 +268,7 @@ public class SystemTransformRecord { builder.setTransformRetryCount(resultSet.getInt(col++)); builder.setStartTs(resultSet.getTimestamp(col++)); builder.setLastStateTs(resultSet.getTimestamp(col++)); - builder.setOldMetadata(resultSet.getString(col++)); + builder.setOldMetadata(resultSet.getBytes(col++)); builder.setNewMetadata(resultSet.getString(col++)); builder.setTransformFunction(resultSet.getString(col++)); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java index 8aecf1d..7276c65 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java @@ -109,7 +109,7 @@ public class Transform { long sequenceNum, PTable.TransformType transformType) throws SQLException { try { String newMetadata = JacksonUtil.getObjectWriter().writeValueAsString(changingProperties); - String oldMetadata = ""; + byte[] oldMetadata = PTableImpl.toProto(table).toByteArray(); String newPhysicalTableName = ""; SystemTransformRecord.SystemTransformBuilder transformBuilder = new SystemTransformRecord.SystemTransformBuilder(); String schema = table.getSchemaName()!=null ? table.getSchemaName().getString() : null; @@ -401,9 +401,9 @@ public class Transform { stmt.setNull(colNum++, Types.TIMESTAMP); } if (systemTransformParams.getOldMetadata() != null) { - stmt.setString(colNum++, systemTransformParams.getOldMetadata()); + stmt.setBytes(colNum++, systemTransformParams.getOldMetadata()); } else { - stmt.setNull(colNum++, Types.VARCHAR); + stmt.setNull(colNum++, Types.VARBINARY); } if (systemTransformParams.getNewMetadata() != null) { stmt.setString(colNum++, systemTransformParams.getNewMetadata()); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java index da445f6..df33c41 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/TransformMaintainer.java @@ -64,6 +64,7 @@ import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -76,7 +77,7 @@ import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCOD public class TransformMaintainer extends IndexMaintainer { private boolean isMultiTenant; - // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column + // expressions that are not present in the row key of the old table, the expression can also refer to a regular column private List<Expression> newTableExpressions; private Set<ColumnReference> newTableColumns; @@ -196,6 +197,19 @@ public class TransformMaintainer extends IndexMaintainer { } } + public Set<ColumnReference> getAllColumnsForDataTable() { + Set<ColumnReference> result = Sets.newLinkedHashSetWithExpectedSize(newTableExpressions.size() + coveredColumnsMap.size()); + result.addAll(newTableColumns); + for (ColumnReference colRef : coveredColumnsMap.keySet()) { + if (oldTableImmutableStorageScheme == PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN) { + result.add(colRef); + } else { + result.add(new ColumnReference(colRef.getFamily(), QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES)); + } + } + return result; + } + /* * Build the old table row key */ @@ -376,6 +390,16 @@ public class TransformMaintainer extends IndexMaintainer { for (ServerCachingProtos.ColumnInfo info : newTblColumnInfoList) { maintainer.newTableColumnsInfo.add(new Pair<>(info.getFamilyName(), info.getColumnName())); } + maintainer.newTableExpressions = new ArrayList<>(); + try (ByteArrayInputStream stream = new ByteArrayInputStream(proto.getNewTableExpressions().toByteArray())) { + DataInput input = new DataInputStream(stream); + while (stream.available() > 0) { + int expressionOrdinal = WritableUtils.readVInt(input); + Expression expression = ExpressionType.values()[expressionOrdinal].newInstance(); + expression.readFields(input); + maintainer.newTableExpressions.add(expression); + } + } // proto doesn't support single byte so need an explicit cast here maintainer.newTableEncodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte) proto.getNewTableEncodingScheme()); maintainer.newTableImmutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte) proto.getNewTableImmutableStorageScheme()); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index 878d89e..2d57d8f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -58,6 +58,7 @@ import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.BaseQueryPlan; @@ -85,11 +86,16 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.ValueSchema.Field; +import org.apache.phoenix.schema.tool.SchemaExtractionProcessor; +import org.apache.phoenix.schema.transform.SystemTransformRecord; +import org.apache.phoenix.schema.transform.Transform; +import org.apache.phoenix.schema.transform.TransformMaintainer; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PDataType; @@ -1129,46 +1135,89 @@ public class ScanUtil { } public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table, PhoenixConnection phoenixConnection) throws SQLException { - if (table.isTransactional() || table.getType() != PTableType.INDEX) { - return; - } + boolean isTransforming = (table.getTransformingNewTable() != null); PTable indexTable = table; - if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) { - return; - } - String schemaName = indexTable.getParentSchemaName().getString(); - String tableName = indexTable.getParentTableName().getString(); - PTable dataTable; - try { - dataTable = PhoenixRuntime.getTable(phoenixConnection, SchemaUtil.getTableName(schemaName, tableName)); - } catch (TableNotFoundException e) { - // This index table must be being deleted. No need to set the scan attributes - return; - } - // MetaDataClient modifies the index table name for view indexes if the parent view of an index has a child - // view. This, we need to recreate a PTable object with the correct table name for the rest of this code to work - if (indexTable.getViewIndexId() != null && indexTable.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) { - int lastIndexOf = indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR); - String indexName = indexTable.getName().getString().substring(lastIndexOf + 1); - indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName); - } - if (!dataTable.getIndexes().contains(indexTable)) { - return; - } - if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) { - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(indexTable), phoenixConnection); - scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr)); - } - scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES); - scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, dataTable.getPhysicalName().getBytes()); - IndexMaintainer indexMaintainer = indexTable.getIndexMaintainer(dataTable, phoenixConnection); - byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(); - byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); - scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF); - scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ); - if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == null) { - BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable); + // Transforming index table can be repaired in regular path via globalindexchecker coproc on it. + // phoenixConnection is closed when it is called from mappers + if (!phoenixConnection.isClosed() && table.getType() == PTableType.TABLE && isTransforming) { + SystemTransformRecord systemTransformRecord = Transform.getTransformRecord(indexTable.getSchemaName(), indexTable.getTableName(), + null, phoenixConnection.getTenantId(), phoenixConnection); + if (systemTransformRecord == null) { + return; + } + // Old table is still active, cutover didn't happen yet, so, no need to read repair + if (!systemTransformRecord.getTransformStatus().equals(PTable.TransformStatus.COMPLETED.name())) { + return; + } + byte[] oldTableBytes = systemTransformRecord.getOldMetadata(); + if (oldTableBytes == null || oldTableBytes.length == 0) { + return; + } + PTable oldTable = null; + try { + oldTable = PTableImpl.createFromProto(PTableProtos.PTable.parseFrom(oldTableBytes)); + } catch (IOException e) { + LOGGER.error("Cannot parse old table info for read repair for table " + table.getName()); + return; + } + TransformMaintainer indexMaintainer = indexTable.getTransformMaintainer(oldTable, phoenixConnection); + // This is the path where we are reading from the newly transformed table + if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + TransformMaintainer.serialize(oldTable, ptr, indexTable, phoenixConnection); + scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + } + scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES); + scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, oldTable.getPhysicalName().getBytes()); + byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(); + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF); + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ); + scan.setAttribute(BaseScannerRegionObserver.READ_REPAIR_TRANSFORMING_TABLE, TRUE_BYTES); + } else { + if (table.isTransactional() || table.getType() != PTableType.INDEX) { + return; + } + if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) { + return; + } + + String schemaName = indexTable.getParentSchemaName().getString(); + String tableName = indexTable.getParentTableName().getString(); + PTable dataTable; + try { + dataTable = PhoenixRuntime.getTable(phoenixConnection, SchemaUtil.getTableName(schemaName, tableName)); + } catch (TableNotFoundException e) { + // This index table must be being deleted. No need to set the scan attributes + return; + } + // MetaDataClient modifies the index table name for view indexes if the parent view of an index has a child + // view. This, we need to recreate a PTable object with the correct table name for the rest of this code to work + if (indexTable.getViewIndexId() != null && indexTable.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) { + int lastIndexOf = indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR); + String indexName = indexTable.getName().getString().substring(lastIndexOf + 1); + indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName); + } + if (!dataTable.getIndexes().contains(indexTable)) { + return; + } + + if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(indexTable), phoenixConnection); + scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + } + scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES); + scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, dataTable.getPhysicalName().getBytes()); + IndexMaintainer indexMaintainer = indexTable.getIndexMaintainer(dataTable, phoenixConnection); + byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(); + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF); + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ); + if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == null) { + BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable); + } + addEmptyColumnToScan(scan, emptyCF, emptyCQ); } }