dbwong commented on code in PR #1430:
URL: https://github.com/apache/phoenix/pull/1430#discussion_r910708348


##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java:
##########
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.io.Closeable;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicValue;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.util.JacksonUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The command line tool to manage high availability (HA) groups and their 
cluster roles.
+ */
+public class PhoenixHAAdminTool extends Configured implements Tool {
+    // Following are return value of this tool. We need this to be very 
explicit because external
+    // system calling this tool may need to retry, alert or audit the 
operations of cluster roles.
+    public static final int RET_SUCCESS = 0; // Saul Goodman
+    public static final int RET_ARGUMENT_ERROR = 1; // arguments are invalid
+    public static final int RET_SYNC_ERROR = 2; //  error to sync from 
manifest to ZK
+    public static final int RET_REPAIR_FOUND_INCONSISTENCIES = 3; // error to 
repair current ZK
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PhoenixHAAdminTool.class);
+
+    private static final Option HELP_OPT = new Option("h", "help", false, 
"Show this help");
+    private static final Option FORCEFUL_OPT =
+            new Option("F", "forceful", false,
+                    "Forceful writing cluster role records ignoring errors on 
other clusters");
+    private static final Option MANIFEST_OPT =
+            new Option("m", "manifest", true, "Manifest file containing 
cluster role records");
+    private static final Option LIST_OPT =
+            new Option("l", "list", false, "List all HA groups stored on this 
ZK cluster");
+    private static final Option REPAIR_OPT = new Option("r", "repair", false,
+            "Verify all HA groups stored on this ZK cluster and repair if 
inconsistency found");
+    @VisibleForTesting
+    static final Options OPTIONS = new Options()
+            .addOption(HELP_OPT)
+            .addOption(FORCEFUL_OPT)
+            .addOption(MANIFEST_OPT)
+            .addOption(LIST_OPT)
+            .addOption(REPAIR_OPT);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        CommandLine commandLine;
+        try {
+            commandLine = parseOptions(args);
+        } catch (Exception e) {
+            System.err.println(
+                    "ERROR: Unable to parse command-line arguments " + 
Arrays.toString(args) + " due to: " + e);
+            printUsageMessage();
+            return RET_ARGUMENT_ERROR;
+        }
+
+        if (commandLine.hasOption(HELP_OPT.getOpt())) {
+            printUsageMessage();
+            return RET_SUCCESS;
+        } else if (commandLine.hasOption(LIST_OPT.getOpt())) { // list
+            String zkUrl = getLocalZkUrl(getConf()); // Admin is created 
against local ZK cluster
+            try (PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(zkUrl, 
getConf(), HighAvailibilityCuratorProvider.INSTANCE)) {
+                List<ClusterRoleRecord> records = 
admin.listAllClusterRoleRecordsOnZookeeper();
+                JacksonUtil.getObjectWriterPretty().writeValue(System.out, 
records);
+            }
+        } else if (commandLine.hasOption(MANIFEST_OPT.getOpt())) { // create 
or update
+            String fileName = 
commandLine.getOptionValue(MANIFEST_OPT.getOpt());
+            List<ClusterRoleRecord> records = readRecordsFromFile(fileName);
+            boolean forceful = commandLine.hasOption(FORCEFUL_OPT.getOpt());
+            Map<String, List<String>> failedHaGroups = 
syncClusterRoleRecords(records, forceful);
+            if (!failedHaGroups.isEmpty()) {
+                System.out.println("Found following HA groups are failing to 
write the clusters:");
+                failedHaGroups.forEach((k, v) ->
+                        System.out.printf("%s -> [%s]\n", k, String.join(",", 
v)));
+                return RET_SYNC_ERROR;
+            }
+        } else if (commandLine.hasOption(REPAIR_OPT.getOpt()))  { // verify 
and repair
+            String zkUrl = getLocalZkUrl(getConf()); // Admin is created 
against local ZK cluster
+            try (PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(zkUrl, 
getConf(), HighAvailibilityCuratorProvider.INSTANCE)) {
+                List<String> inconsistentRecord = 
admin.verifyAndRepairWithRemoteZnode();
+                if (!inconsistentRecord.isEmpty()) {
+                    System.out.println("Found following inconsistent cluster 
role records: ");
+                    System.out.print(String.join(",", inconsistentRecord));
+                    return RET_REPAIR_FOUND_INCONSISTENCIES;
+                }
+            }
+        }
+        return RET_SUCCESS;
+    }
+
+    /**
+     * Read cluster role records defined in the file, given file name.
+     *
+     * @param file The local manifest file name to read from
+     * @return list of cluster role records defined in the manifest file
+     * @throws Exception when parsing or reading from the input file
+     */
+    @VisibleForTesting
+    List<ClusterRoleRecord> readRecordsFromFile(String file) throws Exception {
+        Preconditions.checkArgument(!StringUtils.isEmpty(file));
+        String fileType = FilenameUtils.getExtension(file);
+        switch (fileType) {
+        case "json":
+            // TODO: use jackson or standard JSON library according to 
PHOENIX-5789
+            try (Reader reader = new FileReader(file)) {
+                ClusterRoleRecord[] records =
+                        
JacksonUtil.getObjectReader(ClusterRoleRecord[].class).readValue(reader);
+                return Arrays.asList(records);
+            }
+        case "yaml":
+            LOG.error("YAML file is not yet supported. See W-8274533");
+        default:
+            throw new Exception("Can not read cluster role records from file 
'" + file + "' " +
+                    "reason: unsupported file type");
+        }
+    }
+
+    /**
+     * Helper method to write the given cluster role records into the ZK 
clusters respectively.
+     *
+     * // TODO: add retry logics
+     *
+     * @param records The cluster role record list to save on ZK
+     * @param forceful if true, this method will ignore errors on other 
clusters; otherwise it will
+     *                 not update next cluster (in order) if there is any 
failure on current cluster
+     * @return a map of HA group name to list cluster's url for cluster role 
record failing to write
+     */
+    private Map<String, List<String>> 
syncClusterRoleRecords(List<ClusterRoleRecord> records,
+            boolean forceful) throws IOException {
+        Map<String, List<String>> failedHaGroups = new HashMap<>();
+        for (ClusterRoleRecord record : records) {
+            String haGroupName = record.getHaGroupName();
+            try (PhoenixHAAdminHelper admin1 = new 
PhoenixHAAdminHelper(record.getZk1(), getConf(), 
HighAvailibilityCuratorProvider.INSTANCE);
+                    PhoenixHAAdminHelper admin2 = new 
PhoenixHAAdminHelper(record.getZk2(), getConf(), 
HighAvailibilityCuratorProvider.INSTANCE)) {
+                // Update the cluster previously ACTIVE cluster first.
+                // It reduces the chances of split-brain between clients and 
clusters.
+                // If can not determine previous ACTIVE cluster, update new 
STANDBY cluster first.
+                final PairOfSameType<PhoenixHAAdminHelper> pair;
+                if (admin1.isCurrentActiveCluster(haGroupName)) {
+                    pair = new PairOfSameType<>(admin1, admin2);
+                } else if (admin2.isCurrentActiveCluster(haGroupName)) {
+                    pair = new PairOfSameType<>(admin2, admin1);
+                } else if (record.getRole(admin1.getZkUrl()) == 
ClusterRole.STANDBY) {
+                    pair = new PairOfSameType<>(admin1, admin2);
+                } else {
+                    pair = new PairOfSameType<>(admin2, admin1);

Review Comment:
   In case cluster 1 is offline or unknown I believe.  In general we shouldn't 
see this state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to