Github user twdsilva commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/309#discussion_r204172869
  
    --- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/VerifyReplicationTool.java
 ---
    @@ -0,0 +1,477 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you maynot use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicablelaw or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.mapreduce;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.Collections;
    +import java.util.Map;
    +
    +import org.apache.commons.cli.CommandLine;
    +import org.apache.commons.cli.CommandLineParser;
    +import org.apache.commons.cli.HelpFormatter;
    +import org.apache.commons.cli.Option;
    +import org.apache.commons.cli.Options;
    +import org.apache.commons.cli.ParseException;
    +import org.apache.commons.cli.PosixParser;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HBaseConfiguration;
    +import org.apache.hadoop.hbase.HConstants;
    +import org.apache.hadoop.hbase.client.Scan;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.hadoop.mapreduce.Mapper;
    +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
    +import org.apache.hadoop.util.Tool;
    +import org.apache.hadoop.util.ToolRunner;
    +import org.apache.phoenix.compile.QueryPlan;
    +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
    +import org.apache.phoenix.iterate.ResultIterator;
    +import org.apache.phoenix.jdbc.PhoenixResultSet;
    +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
    +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
    +import org.apache.phoenix.util.EnvironmentEdgeManager;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Strings;
    +
    +/**
    + * Map only job that compares data across a source and target table. The 
target table can be on the
    + * same cluster or on a remote cluster. SQL conditions may be specified to 
compare only a subset of
    + * both tables.
    + */
    +public class VerifyReplicationTool implements Tool {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(VerifyReplicationTool.class);
    +
    +    static final Option
    +            ZK_QUORUM_OPT =
    +            new Option("z", "zookeeper", true, "ZooKeeper connection 
details (optional)");
    +    static final Option
    +            TABLE_NAME_OPT =
    +            new Option("t", "table", true, "Phoenix table name 
(required)");
    +    static final Option
    +            TARGET_TABLE_NAME_OPT =
    +            new Option("tt", "target-table", true, "Target Phoenix table 
name (optional)");
    +    static final Option
    +            TARGET_ZK_QUORUM_OPT =
    +            new Option("tz", "target-zookeeper", true,
    +                    "Target ZooKeeper connection details (optional)");
    +    static final Option
    +            CONDITIONS_OPT =
    +            new Option("c", "conditions", true,
    +                    "Conditions for select query WHERE clause (optional)");
    +    static final Option TIMESTAMP =
    +            new Option("ts", "timestamp", true,
    +                    "Timestamp in millis used to compare the two tables.  
Defaults to current time minus 60 seconds");
    +
    +    static final Option HELP_OPT = new Option("h", "help", false, "Show 
this help and quit");
    +
    +    private Configuration conf;
    +
    +    private String zkQuorum;
    +    private String tableName;
    +    private String targetTableName;
    +    private String targetZkQuorum;
    +    private String sqlConditions;
    +    private long timestamp;
    +
    +    VerifyReplicationTool(Configuration conf) {
    +        this.conf = Preconditions.checkNotNull(conf, "Configuration cannot 
be null");
    +    }
    +
    +    public static Builder newBuilder(Configuration conf) {
    +        return new Builder(conf);
    +    }
    +
    +    public static class Verifier
    +            extends Mapper<NullWritable, VerifyReplicationSourceWritable, 
NullWritable, NullWritable> {
    +
    +        private QueryPlan targetQueryPlan;
    +        private PhoenixResultSet targetResultSet = null;
    +        private boolean targetHasNext;
    +        private boolean sourceHasData;
    +
    +        public enum Counter {
    +            GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, 
ONLY_IN_TARGET_TABLE_ROWS,
    +            CONTENT_DIFFERENT_ROWS
    +        }
    +
    +        @Override
    +        protected void setup(Context context) throws IOException, 
InterruptedException {
    +            super.setup(context);
    +            try {
    +                targetQueryPlan = 
PhoenixMapReduceUtil.getQueryPlan(context.getConfiguration(), true);
    +            } catch (SQLException e) {
    +                throw new IOException(e.getMessage());
    +            }
    +
    +        }
    +
    +        @Override
    +        protected void map(NullWritable key, 
VerifyReplicationSourceWritable value, Context context)
    +                throws IOException, InterruptedException {
    +
    +            sourceHasData = true;
    +            try {
    +                if(targetResultSet == null) {
    +                    Configuration conf = context.getConfiguration();
    +                    byte[] targetStartRow = null;
    +                    byte[] targetStopRow = null;
    +                    // find source table split
    +                    PhoenixInputSplit sourceInputSplit = 
(PhoenixInputSplit) context.getInputSplit();
    +                    if(key != null) {
    +                        targetStartRow = value.getSourceKey().get();
    +                    }
    +                    if(sourceInputSplit.getLength() != 0) {
    +                        targetStopRow = 
sourceInputSplit.getKeyRange().getUpperRange();
    --- End diff --
    
    @karanmehta93  it always uses RegionSizeCalculator to generate the splits 
right? I don't see a problem with setting the targetStopRow based on the upper 
range of the split. Will the key ever be null? If it is, should the 
targetStartRow be set to the lower range of the split?


---

Reply via email to