HIVE-18298: Fix TestReplicationScenarios.testConstraints (Daniel Dai, reviewed 
by Sankar Hariappan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c0734ac9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c0734ac9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c0734ac9

Branch: refs/heads/standalone-metastore
Commit: c0734ac91bdd3e81b17d93fcfeddd4503430eea8
Parents: ac24781
Author: Daniel Dai <da...@hortonworks.com>
Authored: Sat Jan 13 21:02:51 2018 -0800
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Sat Jan 13 21:02:51 2018 -0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  | 31 +++++++--
 .../filesystem/ConstraintEventsIterator.java    | 34 +++++++--
 .../repl/bootstrap/load/LoadConstraint.java     | 72 +++++++++++---------
 3 files changed, 95 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c0734ac9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index bd4b0bd..ce0757c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -74,6 +74,21 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
   private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
   private static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints";
   private static final String FUNCTION_METADATA_FILE_NAME = "_metadata";
+  public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", 
"f_");
+    private final String name;
+    private final String prefix;
+    private ConstraintFileType(String name, String prefix) {
+      this.name = name;
+      this.prefix = prefix;
+    }
+    public String getName() {
+      return this.name;
+    }
+
+    public String getPrefix() {
+      return prefix;
+    }
+  }
 
   private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class);
   private ReplLogger replLogger;
@@ -316,17 +331,25 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
   private void dumpConstraintMetadata(String dbName, String tblName, Path 
dbRoot) throws Exception {
     try {
       Path constraintsRoot = new Path(dbRoot, CONSTRAINTS_ROOT_DIR_NAME);
-      Path constraintsFile = new Path(constraintsRoot, tblName);
+      Path commonConstraintsFile = new Path(constraintsRoot, 
ConstraintFileType.COMMON.getPrefix() + tblName);
+      Path fkConstraintsFile = new Path(constraintsRoot, 
ConstraintFileType.FOREIGNKEY.getPrefix() + tblName);
       Hive db = getHive();
       List<SQLPrimaryKey> pks = db.getPrimaryKeyList(dbName, tblName);
       List<SQLForeignKey> fks = db.getForeignKeyList(dbName, tblName);
       List<SQLUniqueConstraint> uks = db.getUniqueConstraintList(dbName, 
tblName);
       List<SQLNotNullConstraint> nns = db.getNotNullConstraintList(dbName, 
tblName);
-      if ((pks != null && !pks.isEmpty()) || (fks != null && !fks.isEmpty()) 
|| (uks != null && !uks.isEmpty())
+      if ((pks != null && !pks.isEmpty()) || (uks != null && !uks.isEmpty())
           || (nns != null && !nns.isEmpty())) {
         try (JsonWriter jsonWriter =
-            new JsonWriter(constraintsFile.getFileSystem(conf), 
constraintsFile)) {
-          ConstraintsSerializer serializer = new ConstraintsSerializer(pks, 
fks, uks, nns, conf);
+            new JsonWriter(commonConstraintsFile.getFileSystem(conf), 
commonConstraintsFile)) {
+          ConstraintsSerializer serializer = new ConstraintsSerializer(pks, 
null, uks, nns, conf);
+          serializer.writeTo(jsonWriter, null);
+        }
+      }
+      if (fks != null && !fks.isEmpty()) {
+        try (JsonWriter jsonWriter =
+            new JsonWriter(fkConstraintsFile.getFileSystem(conf), 
fkConstraintsFile)) {
+          ConstraintsSerializer serializer = new ConstraintsSerializer(null, 
fks, null, null, conf);
           serializer.writeTo(jsonWriter, null);
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/c0734ac9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java
index 349b414..32518e0 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java
@@ -25,7 +25,9 @@ import java.util.Iterator;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask.ConstraintFileType;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
 
@@ -36,15 +38,20 @@ public class ConstraintEventsIterator implements 
Iterator<FSConstraintEvent> {
   private int currentConstraintIndex;
   private FileSystem fs;
   private Path path;
+  private ConstraintFileType mode = ConstraintFileType.COMMON;
 
   public ConstraintEventsIterator(String dumpDirectory, HiveConf hiveConf) 
throws IOException {
     path = new Path(dumpDirectory);
     fs = path.getFileSystem(hiveConf);
   }
 
-  private FileStatus[] listConstraintFilesInDBDir(FileSystem fs, Path dbDir) {
+  private FileStatus[] listConstraintFilesInDBDir(FileSystem fs, Path dbDir, 
String prefix) {
     try {
-      return fs.listStatus(new Path(dbDir, 
ReplicationSemanticAnalyzer.CONSTRAINTS_ROOT_DIR_NAME));
+      return fs.listStatus(new Path(dbDir, 
ReplicationSemanticAnalyzer.CONSTRAINTS_ROOT_DIR_NAME), new PathFilter() {
+        public boolean accept(Path p) {
+          return p.getName().startsWith(prefix);
+        }
+      });
     } catch (FileNotFoundException e) {
       return new FileStatus[]{};
     } catch (IOException e) {
@@ -52,8 +59,7 @@ public class ConstraintEventsIterator implements 
Iterator<FSConstraintEvent> {
     }
   }
 
-  @Override
-  public boolean hasNext() {
+  boolean hasNext(ConstraintFileType type) {
     if (dbDirs == null) {
       try {
         dbDirs = fs.listStatus(path, EximUtil.getDirectoryFilter(fs));
@@ -63,7 +69,7 @@ public class ConstraintEventsIterator implements 
Iterator<FSConstraintEvent> {
       currentDbIndex = 0;
       if (dbDirs.length != 0) {
         currentConstraintIndex = 0;
-        constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath());
+        constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath(), 
type.getPrefix());
       }
     }
     if ((currentDbIndex < dbDirs.length) && (currentConstraintIndex < 
constraintFiles.length)) {
@@ -73,7 +79,7 @@ public class ConstraintEventsIterator implements 
Iterator<FSConstraintEvent> {
       currentDbIndex ++;
       if (currentDbIndex < dbDirs.length) {
         currentConstraintIndex = 0;
-        constraintFiles = listConstraintFilesInDBDir(fs, 
dbDirs[currentDbIndex].getPath());
+        constraintFiles = listConstraintFilesInDBDir(fs, 
dbDirs[currentDbIndex].getPath(), type.getPrefix());
       } else {
         constraintFiles = null;
       }
@@ -82,6 +88,22 @@ public class ConstraintEventsIterator implements 
Iterator<FSConstraintEvent> {
   }
 
   @Override
+  public boolean hasNext() {
+    if (mode == ConstraintFileType.COMMON) {
+      if (hasNext(ConstraintFileType.COMMON)) {
+        return true;
+      } else {
+        // Switch to iterate foreign keys
+        mode = ConstraintFileType.FOREIGNKEY;
+        currentDbIndex = 0;
+        currentConstraintIndex = 0;
+        dbDirs = null;
+      }
+    }
+    return hasNext(ConstraintFileType.FOREIGNKEY);
+  }
+
+  @Override
   public FSConstraintEvent next() {
     int thisIndex = currentConstraintIndex;
     currentConstraintIndex++;

http://git-wip-us.apache.org/repos/asf/hive/blob/c0734ac9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
index d1de15a..60c85f5 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
@@ -73,41 +73,49 @@ public class LoadConstraint {
       String nnsString = json.getString("nns");
       List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends 
Serializable>>();
 
-      AddPrimaryKeyHandler pkHandler = new AddPrimaryKeyHandler();
-      DumpMetaData pkDumpMetaData = new DumpMetaData(fromPath, 
DumpType.EVENT_ADD_PRIMARYKEY, Long.MAX_VALUE, Long.MAX_VALUE, null,
-          context.hiveConf);
-      pkDumpMetaData.setPayload(pksString);
-      tasks.addAll(pkHandler.handle(
-          new MessageHandler.Context(
-              dbNameToLoadIn, null, fromPath.toString(), null, pkDumpMetaData, 
context.hiveConf,
-              context.hiveDb, null, LOG)));
+      if (pksString != null && !pksString.isEmpty()) {
+        AddPrimaryKeyHandler pkHandler = new AddPrimaryKeyHandler();
+        DumpMetaData pkDumpMetaData = new DumpMetaData(fromPath, 
DumpType.EVENT_ADD_PRIMARYKEY, Long.MAX_VALUE, Long.MAX_VALUE, null,
+            context.hiveConf);
+        pkDumpMetaData.setPayload(pksString);
+        tasks.addAll(pkHandler.handle(
+            new MessageHandler.Context(
+                dbNameToLoadIn, null, fromPath.toString(), null, 
pkDumpMetaData, context.hiveConf,
+                context.hiveDb, null, LOG)));
+      }
 
-      AddUniqueConstraintHandler ukHandler = new AddUniqueConstraintHandler();
-      DumpMetaData ukDumpMetaData = new DumpMetaData(fromPath, 
DumpType.EVENT_ADD_UNIQUECONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null,
-          context.hiveConf);
-      ukDumpMetaData.setPayload(uksString);
-      tasks.addAll(ukHandler.handle(
-          new MessageHandler.Context(
-              dbNameToLoadIn, null, fromPath.toString(), null, ukDumpMetaData, 
context.hiveConf,
-              context.hiveDb, null, LOG)));
+      if (uksString != null && !uksString.isEmpty()) {
+        AddUniqueConstraintHandler ukHandler = new 
AddUniqueConstraintHandler();
+        DumpMetaData ukDumpMetaData = new DumpMetaData(fromPath, 
DumpType.EVENT_ADD_UNIQUECONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null,
+            context.hiveConf);
+        ukDumpMetaData.setPayload(uksString);
+        tasks.addAll(ukHandler.handle(
+            new MessageHandler.Context(
+                dbNameToLoadIn, null, fromPath.toString(), null, 
ukDumpMetaData, context.hiveConf,
+                context.hiveDb, null, LOG)));
+      }
 
-      AddNotNullConstraintHandler nnHandler = new 
AddNotNullConstraintHandler();
-      DumpMetaData nnDumpMetaData = new DumpMetaData(fromPath, 
DumpType.EVENT_ADD_NOTNULLCONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null,
-          context.hiveConf);
-      nnDumpMetaData.setPayload(nnsString);
-      tasks.addAll(nnHandler.handle(
-          new MessageHandler.Context(
-              dbNameToLoadIn, null, fromPath.toString(), null, nnDumpMetaData, 
context.hiveConf,
-              context.hiveDb, null, LOG)));
+      if (nnsString != null && !nnsString.isEmpty()) {
+        AddNotNullConstraintHandler nnHandler = new 
AddNotNullConstraintHandler();
+        DumpMetaData nnDumpMetaData = new DumpMetaData(fromPath, 
DumpType.EVENT_ADD_NOTNULLCONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null,
+            context.hiveConf);
+        nnDumpMetaData.setPayload(nnsString);
+        tasks.addAll(nnHandler.handle(
+            new MessageHandler.Context(
+                dbNameToLoadIn, null, fromPath.toString(), null, 
nnDumpMetaData, context.hiveConf,
+                context.hiveDb, null, LOG)));
+      }
 
-      AddForeignKeyHandler fkHandler = new AddForeignKeyHandler();
-      DumpMetaData fkDumpMetaData = new DumpMetaData(fromPath, 
DumpType.EVENT_ADD_FOREIGNKEY, Long.MAX_VALUE, Long.MAX_VALUE, null,
-          context.hiveConf);
-      fkDumpMetaData.setPayload(fksString);
-      tasks.addAll(fkHandler.handle(
-          new MessageHandler.Context(
-              dbNameToLoadIn, null, fromPath.toString(), null, fkDumpMetaData, 
context.hiveConf,
-              context.hiveDb, null, LOG)));
+      if (fksString != null && !fksString.isEmpty()) {
+        AddForeignKeyHandler fkHandler = new AddForeignKeyHandler();
+        DumpMetaData fkDumpMetaData = new DumpMetaData(fromPath, 
DumpType.EVENT_ADD_FOREIGNKEY, Long.MAX_VALUE, Long.MAX_VALUE, null,
+            context.hiveConf);
+        fkDumpMetaData.setPayload(fksString);
+        tasks.addAll(fkHandler.handle(
+            new MessageHandler.Context(
+                dbNameToLoadIn, null, fromPath.toString(), null, 
fkDumpMetaData, context.hiveConf,
+                context.hiveDb, null, LOG)));
+      }
 
       tasks.forEach(tracker::addTask);
       return tracker;

Reply via email to