[
https://issues.apache.org/jira/browse/PHOENIX-2460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137666#comment-16137666
]
ASF GitHub Bot commented on PHOENIX-2460:
-----------------------------------------
Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/269#discussion_r134624315
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.mapreduce.PhoenixJobCounters;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+/**
+ * Mapper that reads from the data table and checks the rows against the
index table
+ */
+public class IndexScrutinyMapper extends Mapper<NullWritable,
PhoenixIndexDBWritable, Text, Text> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IndexScrutinyMapper.class);
+ private Connection connection;
+ private List<ColumnInfo> targetTblColumnMetadata;
+ private long batchSize;
+ // holds a batch of rows from the table the mapper is iterating over
+ private List<List<Object>> currentBatchValues = new ArrayList<>();
+ private String targetTableQuery;
+ private int numTargetPkCols;
+ private boolean outputInvalidRows;
+ private OutputFormat outputFormat = OutputFormat.FILE;
+ private String qSourceTable;
+ private String qTargetTable;
+ private long executeTimestamp;
+ private int numSourcePkCols;
+ private final PhoenixIndexDBWritable indxWritable = new
PhoenixIndexDBWritable();
+ private List<ColumnInfo> sourceTblColumnMetadata;
+
+ // used to write results to the output table
+ private Connection outputConn;
+ private PreparedStatement outputUpsertStmt;
+ private long outputMaxRows;
+
+ @Override
+ protected void setup(final Context context) throws IOException,
InterruptedException {
+ super.setup(context);
+ final Configuration configuration = context.getConfiguration();
+ try {
+ // get a connection with correct CURRENT_SCN (so incoming
writes don't throw off the
+ // scrutiny)
+ final Properties overrideProps = new Properties();
+ String scn =
configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+ overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+ connection = ConnectionUtil.getOutputConnection(configuration,
overrideProps);
+ connection.setAutoCommit(false);
+ batchSize =
PhoenixConfigurationUtil.getScrutinyBatchSize(configuration);
+ outputInvalidRows =
+
PhoenixConfigurationUtil.getScrutinyOutputInvalidRows(configuration);
+ outputFormat =
PhoenixConfigurationUtil.getScrutinyOutputFormat(configuration);
+ executeTimestamp =
PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(configuration);
+
+ // get the index table and column names
+ String qDataTable =
PhoenixConfigurationUtil.getScrutinyDataTableName(configuration);
+ final PTable pdataTable = PhoenixRuntime.getTable(connection,
qDataTable);
+ final String qIndexTable =
+
PhoenixConfigurationUtil.getScrutinyIndexTableName(configuration);
+ final PTable pindexTable = PhoenixRuntime.getTable(connection,
qIndexTable);
+
+ // set the target table based on whether we're running the MR
over the data or index
+ // table
+ SourceTable sourceTable =
+
PhoenixConfigurationUtil.getScrutinySourceTable(configuration);
+ SourceTargetColumnNames columnNames =
+ SourceTable.DATA_TABLE_SOURCE.equals(sourceTable)
+ ? new
SourceTargetColumnNames.DataSourceColNames(pdataTable,
+ pindexTable)
+ : new
SourceTargetColumnNames.IndexSourceColNames(pdataTable,
+ pindexTable);
+ qSourceTable = columnNames.getQualifiedSourceTableName();
+ qTargetTable = columnNames.getQualifiedTargetTableName();
+ List<String> targetColNames = columnNames.getTargetColNames();
+ List<String> sourceColNames = columnNames.getSourceColNames();
+ List<String> targetPkColNames =
columnNames.getTargetPkColNames();
+ String targetPksCsv =
+
Joiner.on(",").join(SchemaUtil.getEscapedFullColumnNames(targetPkColNames));
+ numSourcePkCols = columnNames.getSourcePkColNames().size();
+ numTargetPkCols = targetPkColNames.size();
+
+ if (OutputFormat.TABLE.equals(outputFormat)) {
+ outputConn =
ConnectionUtil.getOutputConnection(configuration, new Properties());
+ String upsertQuery =
PhoenixConfigurationUtil.getUpsertStatement(configuration);
+ this.outputUpsertStmt =
outputConn.prepareStatement(upsertQuery);
+ }
+ outputMaxRows =
PhoenixConfigurationUtil.getScrutinyOutputMax(configuration);
+
+ // Create the query against the target table
+ // Our query projection should be all the index column names
(or their data table
+ // equivalent
+ // name)
+ targetTableQuery =
+ QueryUtil.constructSelectStatement(qTargetTable,
columnNames.getCastedTargetColNames(), targetPksCsv,
+ Hint.NO_INDEX, false) + " IN ";
+ targetTblColumnMetadata =
+ PhoenixRuntime.generateColumnInfo(connection,
qTargetTable, targetColNames);
+ sourceTblColumnMetadata =
+ PhoenixRuntime.generateColumnInfo(connection,
qSourceTable, sourceColNames);
+ LOG.info("Base query against target table: " +
targetTableQuery);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void map(NullWritable key, PhoenixIndexDBWritable record,
Context context)
+ throws IOException, InterruptedException {
+ try {
+ final List<Object> values = record.getValues();
+
+
context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+ currentBatchValues.add(values);
+ if
(context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize !=
0) {
+ // if we haven't hit the batch size, just report progress
and move on to next record
+ context.progress();
+ return;
+ } else {
+ // otherwise, process the batch
+ processBatch(context);
+ }
+ context.progress(); // Make sure progress is reported to
Application Master.
+ } catch (SQLException | IllegalArgumentException e) {
+ LOG.error(" Error while read/write of a record ", e);
+
context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
InterruptedException {
+ super.cleanup(context);
+ if (connection != null) {
+ try {
+ processBatch(context);
+ connection.close();
+ if (outputConn != null) {
+ outputConn.close();
+ }
+ } catch (SQLException e) {
+ LOG.error("Error while closing connection in the
PhoenixIndexMapper class ", e);
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private void processBatch(Context context)
+ throws SQLException, IOException, InterruptedException {
+ if (currentBatchValues.size() == 0) return;
+
context.getCounter(PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT).increment(1);
+ // our query selection filter should be the PK columns of the
target table (index or data
+ // table)
+ String inClause =
+ QueryUtil.constructParameterizedInClause(numTargetPkCols,
+ currentBatchValues.size());
+ String indexQuery = targetTableQuery + inClause;
+ PreparedStatement targetStatement =
connection.prepareStatement(indexQuery);
+
+ // while we build the PreparedStatement, we also maintain a hash
of the target table PKs,
+ // which we use to join against the results of the query on the
target table
+ Map<Integer, List<Object>> targetPkToSourceValues =
buildTargetStatement(targetStatement);
+
+ // fetch results from the target table and output invalid rows
+ queryTargetTable(context, targetStatement, targetPkToSourceValues);
+
+ // any source values we have left over are invalid (e.g. data
table rows without
+ // corresponding index row)
+ context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT)
+ .increment(targetPkToSourceValues.size());
+ if (outputInvalidRows) {
+ for (List<Object> sourceRowWithoutTargetRow :
targetPkToSourceValues.values()) {
+ if (OutputFormat.FILE.equals(outputFormat)) {
+ context.write(new
Text(Arrays.toString(sourceRowWithoutTargetRow.toArray())),
+ new Text("Target row not found"));
+ } else if (OutputFormat.TABLE.equals(outputFormat)) {
+ writeToOutputTable(context, sourceRowWithoutTargetRow,
null);
+ }
+ }
+ }
+ if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) {
+ outputUpsertStmt.executeBatch(); // write out invalid rows to
output table
+ outputConn.commit();
+ }
+ currentBatchValues.clear();
+ }
+
+ private Map<Integer, List<Object>>
buildTargetStatement(PreparedStatement targetStatement)
+ throws SQLException {
+ Map<Integer, List<Object>> targetPkToSourceValues =
+ new HashMap<>(currentBatchValues.size());
+ int rsIndex = 1;
+ for (List<Object> batchRow : currentBatchValues) {
+ // our original query against the source table (which provided
the batchRow) projected
+ // with the data table PK cols first, so the first
numTargetPkCols form the PK
+ int targetPkHash = getPkHash(batchRow.subList(0,
numTargetPkCols));
+ targetPkToSourceValues.put(targetPkHash, batchRow);
+ for (int i = 0; i < numTargetPkCols; i++) {
+ ColumnInfo targetPkInfo = targetTblColumnMetadata.get(i);
+ Object value = batchRow.get(i);
+ if (value == null) {
+ targetStatement.setNull(rsIndex++,
targetPkInfo.getSqlType());
+ } else {
+ targetStatement.setObject(rsIndex++, value,
targetPkInfo.getSqlType());
+ }
+ }
+ }
+ return targetPkToSourceValues;
+ }
+
+ private void queryTargetTable(Context context, PreparedStatement
targetStatement,
+ Map<Integer, List<Object>> targetPkToSourceValues)
+ throws SQLException, IOException, InterruptedException {
+ ResultSet targetResultSet = targetStatement.executeQuery();
+ while (targetResultSet.next()) {
+ indxWritable.readFields(targetResultSet);
+ List<Object> targetValues = indxWritable.getValues();
+ // first grab the PK and try to join against the source input
+ // the query is such that first numTargetPkCols of the
resultSet is the PK
+ List<Object> pkObjects = new ArrayList<>(numTargetPkCols);
+ for (int i = 0; i < numTargetPkCols; i++) {
+ Object pkPart = targetResultSet.getObject(i + 1);
--- End diff --
If you want to get at the time stamp of the underlying cell, you can do
something like the following:
targetResultSet.unwrap(PhoenixResultSet.class).getCurrentRow().get(0).getTimestamp();
This would give you back the time stamp of the first Cell from the scan
that was executed (we only return a single cell back with the selected
expressions packed into the value). Not sure if this is useful or not, but just
wanted to pass it along.
> Implement scrutiny command to validate whether or not an index is in sync
> with the data table
> ---------------------------------------------------------------------------------------------
>
> Key: PHOENIX-2460
> URL: https://issues.apache.org/jira/browse/PHOENIX-2460
> Project: Phoenix
> Issue Type: Bug
> Reporter: James Taylor
> Assignee: Vincent Poon
> Attachments: PHOENIX-2460.patch
>
>
> We should have a process that runs to verify that an index is valid against a
> data table and potentially fixes it if discrepancies are found. This could
> either be a MR job or a low priority background task.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)