This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 796a9c5 HIVE-23020: Avoid using _files for replication data copy during incremental run (Pravin Kumar Sinha, reviewed by Aasha Medhi) 796a9c5 is described below commit 796a9c55757e8c384f86cfd2fd8842bbb2cdd31f Author: Pravin Kumar Sinha <mailpravi...@gmail.com> AuthorDate: Thu Apr 9 17:22:37 2020 +0530 HIVE-23020: Avoid using _files for replication data copy during incremental run (Pravin Kumar Sinha, reviewed by Aasha Medhi) --- .../ql/parse/TestMetaStoreEventListenerInRepl.java | 2 +- .../hive/ql/parse/TestReplicationScenarios.java | 52 +++++++++- .../TestReplicationScenariosAcrossInstances.java | 2 +- .../apache/hadoop/hive/ql/exec/ReplCopyTask.java | 110 ++++++++++----------- .../hive/ql/parse/ImportSemanticAnalyzer.java | 19 +++- .../hadoop/hive/ql/parse/repl/CopyUtils.java | 33 +++++++ .../repl/dump/events/AbstractEventHandler.java | 43 +++----- .../repl/dump/events/AddPartitionHandler.java | 6 +- .../parse/repl/dump/events/CommitTxnHandler.java | 23 +---- .../parse/repl/dump/events/CreateTableHandler.java | 7 +- .../ql/parse/repl/dump/events/InsertHandler.java | 25 ++--- 11 files changed, 186 insertions(+), 136 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java index 703d16f..5dbee9e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java @@ -181,7 +181,7 @@ public class TestMetaStoreEventListenerInRepl { eventsMap = prepareIncData(primaryDbName); LOG.info(testName.getMethodName() + ": first incremental dump and load."); - primary.run("use " + primaryDbName) + WarehouseInstance.Tuple incre = primary.run("use " + primaryDbName) .dump(primaryDbName); replica.load(replicatedDbName, primaryDbName); ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index c79d4c3..fa96b87 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -1506,6 +1506,13 @@ public class TestReplicationScenarios { Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); + //Verify dump data structure + Path hiveDumpDir = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), hconf); + verifyDataFileExist(fs, hiveDumpDir, null, new Path(unptnLocn).getName()); + verifyDataListFileDoesNotExist(fs, hiveDumpDir, null); + + verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror); run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver); @@ -1526,7 +1533,13 @@ public class TestReplicationScenarios { + ".ptned WHERE b=2", driver); verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver); - incrementalLoadAndVerify(dbName, replDbName); + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); + hiveDumpDir = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + verifyDataFileExist(fs, hiveDumpDir, "b=1", new Path(ptnLocn1).getName()); + verifyDataFileExist(fs, hiveDumpDir, "b=2", new Path(ptnLocn2).getName()); + verifyDataListFileDoesNotExist(fs, hiveDumpDir, "b=1"); + verifyDataListFileDoesNotExist(fs, hiveDumpDir, "b=2"); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptnData1, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptnData2, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror); @@ -3788,6 +3801,43 @@ public class TestReplicationScenarios { assertFalse(success); } + private void verifyDataFileExist(FileSystem fs, Path hiveDumpDir, String part, String dataFile) throws IOException { + FileStatus[] eventFileStatuses = fs.listStatus(hiveDumpDir); + boolean dataFileFound = false; + for (FileStatus eventFileStatus: eventFileStatuses) { + String dataRelativePath = null; + if (part == null) { + dataRelativePath = EximUtil.DATA_PATH_NAME + File.separator + dataFile; + } else { + dataRelativePath = EximUtil.DATA_PATH_NAME + File.separator + part + File.separator + dataFile; + } + if (fs.exists(new Path(eventFileStatus.getPath(), dataRelativePath))) { + dataFileFound = true; + break; + } + } + assertTrue(dataFileFound); + } + + private void verifyDataListFileDoesNotExist(FileSystem fs, Path hiveDumpDir, String part) + throws IOException { + FileStatus[] eventFileStatuses = fs.listStatus(hiveDumpDir); + boolean dataListFileFound = false; + for (FileStatus eventFileStatus: eventFileStatuses) { + String dataRelativePath = null; + if (part == null) { + dataRelativePath = EximUtil.DATA_PATH_NAME + File.separator + EximUtil.FILES_NAME; + } else { + dataRelativePath = part + File.separator + EximUtil.FILES_NAME; + } + if (fs.exists(new Path(eventFileStatus.getPath(), dataRelativePath))) { + dataListFileFound = true; + break; + } + } + assertFalse(dataListFileFound); + } + private void verifyRunWithPatternMatch(String cmd, String key, String pattern, IDriver myDriver) throws IOException { run(cmd, myDriver); List<String> results = getOutput(myDriver); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 33124c8..e1b8b81 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -925,7 +925,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro Path path = new Path(hiveDumpDir); FileSystem fs = path.getFileSystem(conf); FileStatus[] fileStatus = fs.listStatus(path); - int numEvents = fileStatus.length - 3; //one is metadata file and one data dir and one is _dump ack + int numEvents = fileStatus.length - 2; //one is metadata file and one is _dump ack replica.load(replicatedDbName, primaryDbName, Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'")) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 3c7274c..51c3b6f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -102,35 +102,6 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { } } - private void renameFileCopiedFromCmPath(Path toPath, FileSystem dstFs, List<ReplChangeManager.FileInfo> srcFiles) - throws IOException { - for (ReplChangeManager.FileInfo srcFile : srcFiles) { - if (srcFile.isUseSourcePath()) { - continue; - } - String destFileName = srcFile.getCmPath().getName(); - Path destRoot = CopyUtils.getCopyDestination(srcFile, toPath); - Path destFile = new Path(destRoot, destFileName); - if (dstFs.exists(destFile)) { - String destFileWithSourceName = srcFile.getSourcePath().getName(); - Path newDestFile = new Path(destRoot, destFileWithSourceName); - - // if the new file exist then delete it before renaming, to avoid rename failure. If the copy is done - // directly to table path (bypassing staging directory) then there might be some stale files from previous - // incomplete/failed load. No need of recycle as this is a case of stale file. - if (dstFs.exists(newDestFile)) { - LOG.debug(" file " + newDestFile + " is deleted before renaming"); - dstFs.delete(newDestFile, true); - } - boolean result = dstFs.rename(destFile, newDestFile); - if (!result) { - throw new IllegalStateException( - "could not rename " + destFile.getName() + " to " + newDestFile.getName()); - } - } - } - } - @Override public int execute() { LOG.debug("ReplCopyTask.execute()"); @@ -190,32 +161,16 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { } if (work.isCopyToMigratedTxnTable()) { - if (work.isNeedCheckDuplicateCopy()) { - updateSrcFileListForDupCopy(dstFs, toPath, srcFiles, - ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID); - if (srcFiles.isEmpty()) { - LOG.info("All files are already present in the base directory. Skipping copy task."); - return 0; - } + if (isDuplicateCopy(dstFs, toPath, srcFiles)) { + return 0; } - // If direct (move optimized) copy is triggered for data to a migrated transactional table, then it - // should have a write ID allocated by parent ReplTxnTask. Use it to create the base or delta directory. - // The toPath received in ReplCopyWork is pointing to table/partition base location. - // So, just need to append the base or delta directory. - // getDeleteDestIfExist returns true if it is repl load for replace/insert overwrite event and - // hence need to create base directory. If false, then it is repl load for regular insert into or - // load flow and hence just create delta directory. - Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf); - if (writeId == null) { + + Path modifiedToPath = getModifiedToPath(toPath); + if (modifiedToPath == null) { console.printError("ReplCopyTask : Write id is not set in the config by open txn task for migration"); return 6; } - // Set stmt id 0 for bootstrap load as the directory needs to be searched during incremental load to avoid any - // duplicate copy from the source. Check HIVE-21197 for more detail. - int stmtId = (writeId.equals(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)) ? - ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID : - context.getHiveTxnManager().getStmtIdAndIncrement(); - toPath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, stmtId)); + toPath = modifiedToPath; } } else { // This flow is usually taken for IMPORT command @@ -231,12 +186,22 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { return 0; } } - for (FileStatus oneSrc : srcs) { console.printInfo("Copying file: " + oneSrc.getPath().toString()); LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath); - srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf), - oneSrc.getPath(), null)); + srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf), oneSrc.getPath(), null)); + } + if (work.isCopyToMigratedTxnTable()) { + if (isDuplicateCopy(dstFs, toPath, srcFiles)) { + return 0; + } + + Path modifiedToPath = getModifiedToPath(toPath); + if (modifiedToPath == null) { + console.printError("ReplCopyTask : Write id is not set in the config by open txn task for migration"); + return 6; + } + toPath = modifiedToPath; } } @@ -255,12 +220,13 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { return 2; } // Copy the files from different source file systems to one destination directory - new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs).copyAndVerify(toPath, srcFiles, fromPath); + CopyUtils copyUtils = new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs); + copyUtils.copyAndVerify(toPath, srcFiles, fromPath); // If a file is copied from CM path, then need to rename them using original source file name // This is needed to avoid having duplicate files in target if same event is applied twice // where the first event refers to source path and second event refers to CM path - renameFileCopiedFromCmPath(toPath, dstFs, srcFiles); + copyUtils.renameFileCopiedFromCmPath(toPath, dstFs, srcFiles); return 0; } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); @@ -269,6 +235,38 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { } } + private boolean isDuplicateCopy(FileSystem dstFs, Path toPath, List<ReplChangeManager.FileInfo> srcFiles) + throws IOException { + if (work.isNeedCheckDuplicateCopy()) { + updateSrcFileListForDupCopy(dstFs, toPath, srcFiles, + ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID); + if (srcFiles.isEmpty()) { + LOG.info("All files are already present in the base directory. Skipping copy task."); + return true; + } + } + return false; + } + + private Path getModifiedToPath(Path toPath) { + // If direct (move optimized) copy is triggered for data to a migrated transactional table, then it + // should have a write ID allocated by parent ReplTxnTask. Use it to create the base or delta directory. + // The toPath received in ReplCopyWork is pointing to table/partition base location. + // So, just need to append the base or delta directory. + // getDeleteDestIfExist returns true if it is repl load for replace/insert overwrite event and + // hence need to create base directory. If false, then it is repl load for regular insert into or + // load flow and hence just create delta directory. + Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf); + if (writeId == null) { + return null; + } + // Set stmt id 0 for bootstrap load as the directory needs to be searched during incremental load to avoid any + // duplicate copy from the source. Check HIVE-21197 for more detail. + int stmtId = (writeId.equals(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)) ? + ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID : + context.getHiveTxnManager().getStmtIdAndIncrement(); + return new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, stmtId)); + } private List<ReplChangeManager.FileInfo> filesInFileListing(FileSystem fs, Path dataPath) throws IOException { Path fileListing = new Path(dataPath, EximUtil.FILES_NAME); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 7354a3e..80515e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -77,6 +77,7 @@ import com.google.common.collect.ImmutableList; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.net.URI; @@ -505,9 +506,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Task<?> copyTask = null; if (replicationSpec.isInReplicationScope()) { - boolean isImport = ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType()); copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(), - isAutoPurge, needRecycle, copyToMigratedTxnTable, !isImport); + isAutoPurge, needRecycle, copyToMigratedTxnTable, false); } else { copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } @@ -597,6 +597,18 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { return addPartTask; } else { String srcLocation = partSpec.getLocation(); + if (replicationSpec.isInReplicationScope() + && !ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType())) { + Path partLocation = new Path(partSpec.getLocation()); + Path dataDirBase = partLocation.getParent(); + String bucketDir = partLocation.getName(); + for (int i=1; i<partSpec.getPartSpec().size(); i++) { + bucketDir = dataDirBase.getName() + File.separator + bucketDir; + dataDirBase = dataDirBase.getParent(); + } + String relativePartDataPath = EximUtil.DATA_PATH_NAME + File.separator + bucketDir; + srcLocation = new Path(dataDirBase, relativePartDataPath).toString(); + } fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x); x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition " + partSpecToString(partSpec.getPartSpec()) @@ -641,9 +653,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Task<?> copyTask = null; if (replicationSpec.isInReplicationScope()) { - boolean isImport = ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType()); copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath, - x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, !isImport); + x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, false); } else { copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index fd70260..23cd128 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -232,6 +232,39 @@ public class CopyUtils { return pathList; } + /* If a file is copied from CM path, then need to rename them using original source file name + This is needed to avoid having duplicate files in target if same event is applied twice + where the first event refers to source path and second event refers to CM path */ + + public void renameFileCopiedFromCmPath(Path toPath, FileSystem dstFs, List<ReplChangeManager.FileInfo> srcFiles) + throws IOException { + for (ReplChangeManager.FileInfo srcFile : srcFiles) { + if (srcFile.isUseSourcePath()) { + continue; + } + String destFileName = srcFile.getCmPath().getName(); + Path destRoot = CopyUtils.getCopyDestination(srcFile, toPath); + Path destFile = new Path(destRoot, destFileName); + if (dstFs.exists(destFile)) { + String destFileWithSourceName = srcFile.getSourcePath().getName(); + Path newDestFile = new Path(destRoot, destFileWithSourceName); + + // if the new file exist then delete it before renaming, to avoid rename failure. If the copy is done + // directly to table path (bypassing staging directory) then there might be some stale files from previous + // incomplete/failed load. No need of recycle as this is a case of stale file. + if (dstFs.exists(newDestFile)) { + LOG.debug(" file " + newDestFile + " is deleted before renaming"); + dstFs.delete(newDestFile, true); + } + boolean result = dstFs.rename(destFile, newDestFile); + if (!result) { + throw new IllegalStateException( + "could not rename " + destFile.getName() + " to " + newDestFile.getName()); + } + } + } + } + // Check if the source file unmodified even after copy to see if we copied the right file private boolean isSourceFileMismatch(FileSystem sourceFs, ReplChangeManager.FileInfo srcFile) throws IOException { // If source is already CM path, the checksum will be always matching diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java index 049c06b..fed0d50 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.metastore.messaging.MessageEncoder; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; import org.apache.hadoop.hive.ql.metadata.HiveFatalException; +import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; @@ -37,8 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; -import java.io.BufferedWriter; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -89,42 +88,28 @@ abstract class AbstractEventHandler<T extends EventMessage> implements EventHand return event.getEventId(); } - protected void writeFileEntry(String dbName, Table table, String file, BufferedWriter fileListWriter, - Context withinContext) + protected void writeFileEntry(Table table, Partition ptn, String file, Context withinContext) throws IOException, LoginException, MetaException, HiveFatalException { HiveConf hiveConf = withinContext.hiveConf; String distCpDoAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); if (!Utils.shouldDumpMetaDataOnly(withinContext.hiveConf)) { - Path dataPath = new Path(withinContext.dumpRoot.toString(), EximUtil.DATA_PATH_NAME); - List<ReplChangeManager.FileInfo> filePaths = new ArrayList<>(); + Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); + if (table.isPartitioned()) { + dataPath = new Path(dataPath, ptn.getName()); + } String[] decodedURISplits = ReplChangeManager.decodeFileUri(file); - String srcDataFile = decodedURISplits[0]; - Path srcDataPath = new Path(srcDataFile); + Path srcDataPath = new Path(decodedURISplits[0]); if (dataPath.toUri().getScheme() == null) { dataPath = new Path(srcDataPath.toUri().getScheme(), srcDataPath.toUri().getAuthority(), dataPath.toString()); } - String eventTblPath = event.getEventId() + File.separator + dbName + File.separator + table.getTableName(); - String srcDataFileRelativePath = null; - if (srcDataFile.contains(table.getPath().toString())) { - srcDataFileRelativePath = srcDataFile.substring(table.getPath().toString().length() + 1); - } else if (decodedURISplits[3] == null) { - srcDataFileRelativePath = srcDataPath.getName(); - } else { - srcDataFileRelativePath = srcDataFileRelativePath + File.separator + srcDataPath.getName(); - } - Path targetPath = new Path(dataPath, eventTblPath + File.separator + srcDataFileRelativePath); - String encodedTargetPath = ReplChangeManager.encodeFileUri( - targetPath.toString(), decodedURISplits[1], decodedURISplits[3]); - ReplChangeManager.FileInfo f = ReplChangeManager.getFileInfo(new Path(decodedURISplits[0]), + List<ReplChangeManager.FileInfo> filePaths = new ArrayList<>(); + ReplChangeManager.FileInfo fileInfo = ReplChangeManager.getFileInfo(new Path(decodedURISplits[0]), decodedURISplits[1], decodedURISplits[2], decodedURISplits[3], hiveConf); - filePaths.add(f); - FileSystem dstFs = targetPath.getFileSystem(hiveConf); - Path finalTargetPath = targetPath.getParent(); - if (decodedURISplits[3] != null) { - finalTargetPath = finalTargetPath.getParent(); - } - new CopyUtils(distCpDoAsUser, hiveConf, dstFs).copyAndVerify(finalTargetPath, filePaths, srcDataPath); - fileListWriter.write(encodedTargetPath + "\n"); + filePaths.add(fileInfo); + FileSystem dstFs = dataPath.getFileSystem(hiveConf); + CopyUtils copyUtils = new CopyUtils(distCpDoAsUser, hiveConf, dstFs); + copyUtils.copyAndVerify(dataPath, filePaths, srcDataPath); + copyUtils.renameFileCopiedFromCmPath(dataPath, dstFs, filePaths); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index a06b90d..8506532 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -108,10 +108,8 @@ class AddPartitionHandler extends AbstractEventHandler { Iterable<String> files = partitionFilesIter.next().getFiles(); if (files != null) { // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) { - for (String file : files) { - writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); - } + for (String file : files) { + writeFileEntry(qlMdTable, qlPtn, file, withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index dc87506..36369db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -39,12 +39,9 @@ import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; -import org.apache.hadoop.fs.FileSystem; import javax.security.auth.login.LoginException; -import java.io.BufferedWriter; import java.io.IOException; -import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.List; @@ -59,19 +56,11 @@ class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> { return deserializer.getCommitTxnMessage(stringRepresentation); } - private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException { - Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); - FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf); - return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); - } - - private void writeDumpFiles(Table qlMdTable, Context withinContext, Iterable<String> files, Path dataPath) + private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable<String> files, Context withinContext) throws IOException, LoginException, MetaException, HiveFatalException { // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { - for (String file : files) { - writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); - } + for (String file : files) { + writeFileEntry(qlMdTable, ptn, file, withinContext); } } @@ -92,12 +81,10 @@ class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> { withinContext.hiveConf); if ((null == qlPtns) || qlPtns.isEmpty()) { - Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); - writeDumpFiles(qlMdTable, withinContext, fileListArray.get(0), dataPath); + writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext); } else { for (int idx = 0; idx < qlPtns.size(); idx++) { - Path dataPath = new Path(withinContext.eventRoot, qlPtns.get(idx).getName()); - writeDumpFiles(qlMdTable, withinContext, fileListArray.get(idx), dataPath); + writeDumpFiles(qlMdTable, qlPtns.get(idx), fileListArray.get(idx), withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 7a6ddf9..c732b21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -79,14 +79,11 @@ class CreateTableHandler extends AbstractEventHandler<CreateTableMessage> { withinContext.replicationSpec, withinContext.hiveConf); - Path dataPath = new Path(withinContext.eventRoot, "data"); Iterable<String> files = eventMessage.getFiles(); if (files != null) { // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { - for (String file : files) { - writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); - } + for (String file : files) { + writeFileEntry(qlMdTable, null, file, withinContext); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 4e02620..701dd6b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -80,25 +80,16 @@ class InsertHandler extends AbstractEventHandler<InsertMessage> { withinContext.hiveConf); Iterable<String> files = eventMessage.getFiles(); + /* + * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple tables. + * But, Insert event is generated for each partition to which the data is inserted. + * So, qlPtns list will have only one entry. + */ + Partition ptn = (null == qlPtns || qlPtns.isEmpty()) ? null : qlPtns.get(0); if (files != null) { - Path dataPath; - if ((null == qlPtns) || qlPtns.isEmpty()) { - dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); - } else { - /* - * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple - * tables. But, Insert event is generated for each partition to which the data is inserted. So, qlPtns list - * will have only one entry. - */ - assert(1 == qlPtns.size()); - dataPath = new Path(withinContext.eventRoot, qlPtns.get(0).getName()); - } - // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { - for (String file : files) { - writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); - } + for (String file : files) { + writeFileEntry(qlMdTable, ptn, file, withinContext); } }