Repository: hive
Updated Branches:
  refs/heads/master 1aebe9d54 -> 63bdfa687


HIVE-15284: Add junit test to test replication scenarios (Sushanth Sowmyan 
reviewed by Vaibhav Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/63bdfa68
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/63bdfa68
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/63bdfa68

Branch: refs/heads/master
Commit: 63bdfa6870614b6ae930a715dd8711addd16d2b7
Parents: 1aebe9d
Author: Vaibhav Gumashta <vgumas...@hortonworks.com>
Authored: Mon Nov 28 12:28:09 2016 -0800
Committer: Vaibhav Gumashta <vgumas...@hortonworks.com>
Committed: Mon Nov 28 12:28:09 2016 -0800

----------------------------------------------------------------------
 .../hive/ql/TestReplicationScenarios.java       | 238 +++++++++++++++++++
 .../ql/parse/ReplicationSemanticAnalyzer.java   |  47 ++--
 2 files changed, 267 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/63bdfa68/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
new file mode 100644
index 0000000..01abe9b
--- /dev/null
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.util.Shell;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestReplicationScenarios {
+
+  final static String DBNOTIF_LISTENER_CLASSNAME = 
"org.apache.hive.hcatalog.listener.DbNotificationListener";
+      // FIXME : replace with hive copy once that is copied
+  final static String tid =
+      TestReplicationScenarios.class.getCanonicalName().replace('.','_') + "_" 
+ System.currentTimeMillis();
+  final static String TEST_PATH = 
System.getProperty("test.warehouse.dir","/tmp") + Path.SEPARATOR + tid;
+
+  static HiveConf hconf;
+  static boolean useExternalMS = false;
+  static int msPort;
+  static Driver driver;
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationScenarios.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    hconf = new HiveConf(TestReplicationScenarios.class);
+    String metastoreUri = 
System.getProperty("test."+HiveConf.ConfVars.METASTOREURIS.varname);
+    if (metastoreUri != null) {
+      hconf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUri);
+      useExternalMS = true;
+      return;
+    }
+    if (Shell.WINDOWS) {
+      WindowsPathUtil.convertPathsFromWindowsToHdfs(hconf);
+    }
+
+//    System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname,
+//        DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on 
metastore
+    msPort = MetaStoreUtils.startMetaStore();
+    hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/");
+    hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:"
+        + msPort);
+    hconf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+    hconf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    hconf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+    hconf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
+        "false");
+    System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+    System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+
+    Path testPath = new Path(TEST_PATH);
+    FileSystem fs = FileSystem.get(testPath.toUri(),hconf);
+    fs.mkdirs(testPath);
+
+    driver = new Driver(hconf);
+    SessionState.start(new CliSessionState(hconf));
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass(){
+    // FIXME : should clean up TEST_PATH, but not doing it now, for 
debugging's sake
+  }
+
+  @Before
+  public void setUp(){
+    // before each test
+  }
+
+  @After
+  public void tearDown(){
+    // after each test
+  }
+
+  /**
+   * Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 
unptned.
+   * Inserts data into one of the ptned tables, and one of the unptned tables,
+   * and verifies that a REPL DUMP followed by a REPL LOAD is able to load it
+   * appropriately.
+   */
+  @Test
+  public void testBasic() throws IOException {
+
+    String testName = "basic";
+    LOG.info("Testing "+testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) 
STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS 
TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b 
int) STORED AS TEXTFILE");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+    String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[]{};
+
+    String unptn_locn = new Path(TEST_PATH , testName + 
"_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH , testName + 
"_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH , testName + 
"_ptn2").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + 
dbName + ".unptned");
+    run("SELECT * from " + dbName + ".unptned");
+    verifyResults(unptn_data);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + 
dbName + ".ptned PARTITION(b=1)");
+    run("SELECT a from " + dbName + ".ptned WHERE b=1");
+    verifyResults(ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + 
dbName + ".ptned PARTITION(b=2)");
+    run("SELECT a from " + dbName + ".ptned WHERE b=2");
+    verifyResults(ptn_data_2);
+    run("SELECT a from " + dbName + ".ptned_empty");
+    verifyResults(empty);
+    run("SELECT * from " + dbName + ".unptned_empty");
+    verifyResults(empty);
+
+
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0,0);
+    run("REPL LOAD " + dbName + "_dupe FROM '"+replDumpLocn+"'");
+
+    run("SELECT * from " + dbName + "_dupe.unptned");
+    verifyResults(unptn_data);
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1");
+    verifyResults(ptn_data_1);
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2");
+    verifyResults(ptn_data_2);
+    run("SELECT a from " + dbName + ".ptned_empty");
+    verifyResults(empty);
+    run("SELECT * from " + dbName + ".unptned_empty");
+    verifyResults(empty);
+  }
+
+  private String getResult(int rowNum, int colNum) throws IOException {
+    List<String> results = new ArrayList<String>();
+    try {
+      driver.getResults(results);
+    } catch (CommandNeedRetryException e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+    return (results.get(rowNum).split("\\001"))[colNum];
+  }
+
+  private void verifyResults(String[] data) throws IOException {
+    List<String> results = new ArrayList<String>();
+    try {
+      driver.getResults(results);
+    } catch (CommandNeedRetryException e) {
+      LOG.warn(e.getMessage(),e);
+      throw new RuntimeException(e);
+    }
+    LOG.info("Expecting {}",data);
+    LOG.info("Got {}",results);
+    assertEquals(data.length,results.size());
+    for (int i = 0; i < data.length; i++){
+      assertEquals(data[i],results.get(i));
+    }
+  }
+
+  private static void run(String cmd) throws RuntimeException {
+    run(cmd,false); // default arg-less run simply runs, and does not care 
about failure
+  }
+
+  private static boolean run(String cmd, boolean errorOnFail) throws 
RuntimeException {
+    boolean success = false;
+    try {
+      CommandProcessorResponse ret = driver.run(cmd);
+      success = (ret.getException() == null);
+      if (!success){
+        LOG.warn("Error {} : {} running [{}].", ret.getErrorCode(), 
ret.getErrorMessage(), cmd);
+      }
+    } catch (CommandNeedRetryException e) {
+      if (errorOnFail){
+        throw new RuntimeException(e);
+      } else {
+        LOG.warn(e.getMessage(),e);
+        // do nothing else
+      }
+    }
+    return success;
+  }
+
+  public static void createTestDataFile(String filename, String[] lines) 
throws IOException {
+    FileWriter writer = null;
+    try {
+      File file = new File(filename);
+      file.deleteOnExit();
+      writer = new FileWriter(file);
+      for (String line : lines) {
+        writer.write(line + "\n");
+      }
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/63bdfa68/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index a4dfa3a..938355e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -22,7 +22,15 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
+import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -70,24 +78,27 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     LOG.debug("ReplicationSemanticAanalyzer: analyzeInternal");
     LOG.debug(ast.getName() + ":" + ast.getToken().getText() + "=" + 
ast.getText());
     switch (ast.getToken().getType()) {
-    case TOK_REPL_DUMP: {
-      LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump");
-      initReplDump(ast);
-      analyzeReplDump(ast);
-    }
-    case TOK_REPL_LOAD: {
-      LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: load");
-      initReplLoad(ast);
-      analyzeReplLoad(ast);
-    }
-    case TOK_REPL_STATUS: {
-      LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: status");
-      initReplStatus(ast);
-      analyzeReplStatus(ast);
-    }
-    default: {
-      throw new SemanticException("Unexpected root token");
-    }
+      case TOK_REPL_DUMP: {
+        LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump");
+        initReplDump(ast);
+        analyzeReplDump(ast);
+        break;
+      }
+      case TOK_REPL_LOAD: {
+        LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: load");
+        initReplLoad(ast);
+        analyzeReplLoad(ast);
+        break;
+      }
+      case TOK_REPL_STATUS: {
+        LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: status");
+        initReplStatus(ast);
+        analyzeReplStatus(ast);
+        break;
+      }
+      default: {
+        throw new SemanticException("Unexpected root token");
+      }
     }
   }
 

Reply via email to