HIVE-18298: Fix TestReplicationScenarios.testConstraints (Daniel Dai, reviewed by Sankar Hariappan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c0734ac9 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c0734ac9 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c0734ac9 Branch: refs/heads/standalone-metastore Commit: c0734ac91bdd3e81b17d93fcfeddd4503430eea8 Parents: ac24781 Author: Daniel Dai <da...@hortonworks.com> Authored: Sat Jan 13 21:02:51 2018 -0800 Committer: Daniel Dai <da...@hortonworks.com> Committed: Sat Jan 13 21:02:51 2018 -0800 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 31 +++++++-- .../filesystem/ConstraintEventsIterator.java | 34 +++++++-- .../repl/bootstrap/load/LoadConstraint.java | 72 +++++++++++--------- 3 files changed, 95 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c0734ac9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index bd4b0bd..ce0757c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -74,6 +74,21 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; private static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; private static final String FUNCTION_METADATA_FILE_NAME = "_metadata"; + public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", "f_"); + private final String name; + private final String prefix; + private ConstraintFileType(String name, String prefix) { + this.name = name; + this.prefix = prefix; + } + public String getName() { + return this.name; + } + + public String getPrefix() { + return prefix; + } + } private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class); private ReplLogger replLogger; @@ -316,17 +331,25 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { private void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) throws Exception { try { Path constraintsRoot = new Path(dbRoot, CONSTRAINTS_ROOT_DIR_NAME); - Path constraintsFile = new Path(constraintsRoot, tblName); + Path commonConstraintsFile = new Path(constraintsRoot, ConstraintFileType.COMMON.getPrefix() + tblName); + Path fkConstraintsFile = new Path(constraintsRoot, ConstraintFileType.FOREIGNKEY.getPrefix() + tblName); Hive db = getHive(); List<SQLPrimaryKey> pks = db.getPrimaryKeyList(dbName, tblName); List<SQLForeignKey> fks = db.getForeignKeyList(dbName, tblName); List<SQLUniqueConstraint> uks = db.getUniqueConstraintList(dbName, tblName); List<SQLNotNullConstraint> nns = db.getNotNullConstraintList(dbName, tblName); - if ((pks != null && !pks.isEmpty()) || (fks != null && !fks.isEmpty()) || (uks != null && !uks.isEmpty()) + if ((pks != null && !pks.isEmpty()) || (uks != null && !uks.isEmpty()) || (nns != null && !nns.isEmpty())) { try (JsonWriter jsonWriter = - new JsonWriter(constraintsFile.getFileSystem(conf), constraintsFile)) { - ConstraintsSerializer serializer = new ConstraintsSerializer(pks, fks, uks, nns, conf); + new JsonWriter(commonConstraintsFile.getFileSystem(conf), commonConstraintsFile)) { + ConstraintsSerializer serializer = new ConstraintsSerializer(pks, null, uks, nns, conf); + serializer.writeTo(jsonWriter, null); + } + } + if (fks != null && !fks.isEmpty()) { + try (JsonWriter jsonWriter = + new JsonWriter(fkConstraintsFile.getFileSystem(conf), fkConstraintsFile)) { + ConstraintsSerializer serializer = new ConstraintsSerializer(null, fks, null, null, conf); serializer.writeTo(jsonWriter, null); } } http://git-wip-us.apache.org/repos/asf/hive/blob/c0734ac9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java index 349b414..32518e0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/ConstraintEventsIterator.java @@ -25,7 +25,9 @@ import java.util.Iterator; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask.ConstraintFileType; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; @@ -36,15 +38,20 @@ public class ConstraintEventsIterator implements Iterator<FSConstraintEvent> { private int currentConstraintIndex; private FileSystem fs; private Path path; + private ConstraintFileType mode = ConstraintFileType.COMMON; public ConstraintEventsIterator(String dumpDirectory, HiveConf hiveConf) throws IOException { path = new Path(dumpDirectory); fs = path.getFileSystem(hiveConf); } - private FileStatus[] listConstraintFilesInDBDir(FileSystem fs, Path dbDir) { + private FileStatus[] listConstraintFilesInDBDir(FileSystem fs, Path dbDir, String prefix) { try { - return fs.listStatus(new Path(dbDir, ReplicationSemanticAnalyzer.CONSTRAINTS_ROOT_DIR_NAME)); + return fs.listStatus(new Path(dbDir, ReplicationSemanticAnalyzer.CONSTRAINTS_ROOT_DIR_NAME), new PathFilter() { + public boolean accept(Path p) { + return p.getName().startsWith(prefix); + } + }); } catch (FileNotFoundException e) { return new FileStatus[]{}; } catch (IOException e) { @@ -52,8 +59,7 @@ public class ConstraintEventsIterator implements Iterator<FSConstraintEvent> { } } - @Override - public boolean hasNext() { + boolean hasNext(ConstraintFileType type) { if (dbDirs == null) { try { dbDirs = fs.listStatus(path, EximUtil.getDirectoryFilter(fs)); @@ -63,7 +69,7 @@ public class ConstraintEventsIterator implements Iterator<FSConstraintEvent> { currentDbIndex = 0; if (dbDirs.length != 0) { currentConstraintIndex = 0; - constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath()); + constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[0].getPath(), type.getPrefix()); } } if ((currentDbIndex < dbDirs.length) && (currentConstraintIndex < constraintFiles.length)) { @@ -73,7 +79,7 @@ public class ConstraintEventsIterator implements Iterator<FSConstraintEvent> { currentDbIndex ++; if (currentDbIndex < dbDirs.length) { currentConstraintIndex = 0; - constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[currentDbIndex].getPath()); + constraintFiles = listConstraintFilesInDBDir(fs, dbDirs[currentDbIndex].getPath(), type.getPrefix()); } else { constraintFiles = null; } @@ -82,6 +88,22 @@ public class ConstraintEventsIterator implements Iterator<FSConstraintEvent> { } @Override + public boolean hasNext() { + if (mode == ConstraintFileType.COMMON) { + if (hasNext(ConstraintFileType.COMMON)) { + return true; + } else { + // Switch to iterate foreign keys + mode = ConstraintFileType.FOREIGNKEY; + currentDbIndex = 0; + currentConstraintIndex = 0; + dbDirs = null; + } + } + return hasNext(ConstraintFileType.FOREIGNKEY); + } + + @Override public FSConstraintEvent next() { int thisIndex = currentConstraintIndex; currentConstraintIndex++; http://git-wip-us.apache.org/repos/asf/hive/blob/c0734ac9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java index d1de15a..60c85f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java @@ -73,41 +73,49 @@ public class LoadConstraint { String nnsString = json.getString("nns"); List<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>(); - AddPrimaryKeyHandler pkHandler = new AddPrimaryKeyHandler(); - DumpMetaData pkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_PRIMARYKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, - context.hiveConf); - pkDumpMetaData.setPayload(pksString); - tasks.addAll(pkHandler.handle( - new MessageHandler.Context( - dbNameToLoadIn, null, fromPath.toString(), null, pkDumpMetaData, context.hiveConf, - context.hiveDb, null, LOG))); + if (pksString != null && !pksString.isEmpty()) { + AddPrimaryKeyHandler pkHandler = new AddPrimaryKeyHandler(); + DumpMetaData pkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_PRIMARYKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + pkDumpMetaData.setPayload(pksString); + tasks.addAll(pkHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, pkDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + } - AddUniqueConstraintHandler ukHandler = new AddUniqueConstraintHandler(); - DumpMetaData ukDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_UNIQUECONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, - context.hiveConf); - ukDumpMetaData.setPayload(uksString); - tasks.addAll(ukHandler.handle( - new MessageHandler.Context( - dbNameToLoadIn, null, fromPath.toString(), null, ukDumpMetaData, context.hiveConf, - context.hiveDb, null, LOG))); + if (uksString != null && !uksString.isEmpty()) { + AddUniqueConstraintHandler ukHandler = new AddUniqueConstraintHandler(); + DumpMetaData ukDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_UNIQUECONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + ukDumpMetaData.setPayload(uksString); + tasks.addAll(ukHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, ukDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + } - AddNotNullConstraintHandler nnHandler = new AddNotNullConstraintHandler(); - DumpMetaData nnDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_NOTNULLCONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, - context.hiveConf); - nnDumpMetaData.setPayload(nnsString); - tasks.addAll(nnHandler.handle( - new MessageHandler.Context( - dbNameToLoadIn, null, fromPath.toString(), null, nnDumpMetaData, context.hiveConf, - context.hiveDb, null, LOG))); + if (nnsString != null && !nnsString.isEmpty()) { + AddNotNullConstraintHandler nnHandler = new AddNotNullConstraintHandler(); + DumpMetaData nnDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_NOTNULLCONSTRAINT, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + nnDumpMetaData.setPayload(nnsString); + tasks.addAll(nnHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, nnDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + } - AddForeignKeyHandler fkHandler = new AddForeignKeyHandler(); - DumpMetaData fkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_FOREIGNKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, - context.hiveConf); - fkDumpMetaData.setPayload(fksString); - tasks.addAll(fkHandler.handle( - new MessageHandler.Context( - dbNameToLoadIn, null, fromPath.toString(), null, fkDumpMetaData, context.hiveConf, - context.hiveDb, null, LOG))); + if (fksString != null && !fksString.isEmpty()) { + AddForeignKeyHandler fkHandler = new AddForeignKeyHandler(); + DumpMetaData fkDumpMetaData = new DumpMetaData(fromPath, DumpType.EVENT_ADD_FOREIGNKEY, Long.MAX_VALUE, Long.MAX_VALUE, null, + context.hiveConf); + fkDumpMetaData.setPayload(fksString); + tasks.addAll(fkHandler.handle( + new MessageHandler.Context( + dbNameToLoadIn, null, fromPath.toString(), null, fkDumpMetaData, context.hiveConf, + context.hiveDb, null, LOG))); + } tasks.forEach(tracker::addTask); return tracker;