kkrugler commented on a change in pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#discussion_r568254617
##########
File path:
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -208,11 +220,12 @@ public void run()
} else {
LOGGER.info("Creating segments with data files: {}", filteredFiles);
for (int i = 0; i < numDataFiles; i++) {
- String dataFilePath = filteredFiles.get(i);
-
- File localFile = new File("tmp");
+ // Typically PinotFS implementations list files without a protocol, so
we lose (for example) the
+ // hdfs:// portion of the path. Call getFileURI() to fix this up.
+ URI inputFileURI =
SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
Review comment:
It should be fixed in Standalone. I could file an issue for Spark, lmk.
##########
File path:
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -269,33 +300,80 @@ public void run()
throw new RuntimeException("Job failed: " + job);
}
- LOGGER.info("Trying to copy segment tars from staging directory: [{}] to
output directory [{}]", stagingDirURI,
+ LOGGER.info("Moving segment tars from staging directory [{}] to output
directory [{}]", stagingDirURI,
outputDirURI);
- outputDirFS.copy(new Path(stagingDir, SEGMENT_TAR_DIR).toUri(),
outputDirURI);
+ moveFiles(outputDirFS, new Path(stagingDir,
SEGMENT_TAR_SUBDIR_NAME).toUri(), outputDirURI, _spec.isOverwriteOutput());
} finally {
LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI);
outputDirFS.delete(stagingDirURI, true);
}
}
+ /**
+ * Move all files from the <sourceDir> to the <destDir>, but don't delete
existing contents of destDir.
+ * If <overwrite> is true, and the source file exists in the destination
directory, then replace it, otherwise
+ * log a warning and continue. We assume that source and destination
directories are on the same filesystem,
+ * so that move() can be used.
+ *
+ * @param fs
+ * @param sourceDir
+ * @param destDir
+ * @param overwrite
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ private void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean
overwrite) throws IOException, URISyntaxException {
+ for (String sourcePath : fs.listFiles(sourceDir, true)) {
+ URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath,
sourceDir);
+ String sourceFilename = FilenameUtils.getName(sourceFileUri.getPath());
+ URI destFileUri =
SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri,
destDir).resolve(sourceFilename);
+
+ if (!overwrite && fs.exists(destFileUri)) {
+ LOGGER.warn("Can't overwrite existing output segment tar file: {}",
destFileUri);
Review comment:
Good idea, let me look into that
##########
File path:
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -208,11 +220,12 @@ public void run()
} else {
LOGGER.info("Creating segments with data files: {}", filteredFiles);
for (int i = 0; i < numDataFiles; i++) {
- String dataFilePath = filteredFiles.get(i);
-
- File localFile = new File("tmp");
+ // Typically PinotFS implementations list files without a protocol, so
we lose (for example) the
+ // hdfs:// portion of the path. Call getFileURI() to fix this up.
+ URI inputFileURI =
SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
Review comment:
Hi @fx19880617 I think I fixed this in Standalone a while back (that's
what I meant by "It should be fixed in Standalone...", sorry for not being more
clear.
##########
File path:
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationUtilsTest.java
##########
@@ -53,4 +53,31 @@ public void testRelativeURIs() throws URISyntaxException {
"hdfs://namenode2/output/dir/subdir/file.tar.gz");
}
+ // Don't lose authority portion of inputDirURI when creating output files
+ // https://github.com/apache/incubator-pinot/issues/6355
+
+ @Test
+ public void testGetFileURI() throws Exception {
+ // Typical file URI
+ validateFileURI(new URI("file:/path/to/"));
+
+ // Namenode as authority, plus non-standard port
+ validateFileURI(new URI("hdfs://namenode:9999/path/to/"));
+
+ // S3 bucket + path
+ validateFileURI(new URI("s3://bucket/path/to/"));
+
+ // S3 URI with userInfo (username/password)
+ validateFileURI(new URI("s3://username:password@bucket/path/to/"));
+ }
+
+ private void validateFileURI(URI directoryURI) throws URISyntaxException {
+ URI fileURI = new URI(directoryURI.toString() + "file");
+ String rawPath = fileURI.getRawPath();
+
+ Assert.assertEquals(SegmentGenerationUtils.getFileURI(rawPath,
fileURI).toString(),
Review comment:
Hi @fx19880617 - there was a test, for `file:/path/to/`. But thanks for
asking, as I added more tests for different types of file URIs, and found an
issue in the `getFileURI()` method with `file:///path/to/`. Plus I learned
something about file URIs :)
##########
File path:
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -269,33 +300,80 @@ public void run()
throw new RuntimeException("Job failed: " + job);
}
- LOGGER.info("Trying to copy segment tars from staging directory: [{}] to
output directory [{}]", stagingDirURI,
+ LOGGER.info("Moving segment tars from staging directory [{}] to output
directory [{}]", stagingDirURI,
outputDirURI);
- outputDirFS.copy(new Path(stagingDir, SEGMENT_TAR_DIR).toUri(),
outputDirURI);
+ moveFiles(outputDirFS, new Path(stagingDir,
SEGMENT_TAR_SUBDIR_NAME).toUri(), outputDirURI, _spec.isOverwriteOutput());
} finally {
LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI);
outputDirFS.delete(stagingDirURI, true);
}
}
+ /**
+ * Move all files from the <sourceDir> to the <destDir>, but don't delete
existing contents of destDir.
+ * If <overwrite> is true, and the source file exists in the destination
directory, then replace it, otherwise
+ * log a warning and continue. We assume that source and destination
directories are on the same filesystem,
+ * so that move() can be used.
+ *
+ * @param fs
+ * @param sourceDir
+ * @param destDir
+ * @param overwrite
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ private void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean
overwrite) throws IOException, URISyntaxException {
+ for (String sourcePath : fs.listFiles(sourceDir, true)) {
+ URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath,
sourceDir);
+ String sourceFilename = FilenameUtils.getName(sourceFileUri.getPath());
+ URI destFileUri =
SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri,
destDir).resolve(sourceFilename);
+
+ if (!overwrite && fs.exists(destFileUri)) {
+ LOGGER.warn("Can't overwrite existing output segment tar file: {}",
destFileUri);
Review comment:
Unfortunately there's no way to do that before generating the segments,
as the segment names (can) depend on the data used to generate the segment, and
that's what we need to check for collision in the output directory.
##########
File path:
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -269,33 +300,80 @@ public void run()
throw new RuntimeException("Job failed: " + job);
}
- LOGGER.info("Trying to copy segment tars from staging directory: [{}] to
output directory [{}]", stagingDirURI,
+ LOGGER.info("Moving segment tars from staging directory [{}] to output
directory [{}]", stagingDirURI,
outputDirURI);
- outputDirFS.copy(new Path(stagingDir, SEGMENT_TAR_DIR).toUri(),
outputDirURI);
+ moveFiles(outputDirFS, new Path(stagingDir,
SEGMENT_TAR_SUBDIR_NAME).toUri(), outputDirURI, _spec.isOverwriteOutput());
} finally {
LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI);
outputDirFS.delete(stagingDirURI, true);
}
}
+ /**
+ * Move all files from the <sourceDir> to the <destDir>, but don't delete
existing contents of destDir.
+ * If <overwrite> is true, and the source file exists in the destination
directory, then replace it, otherwise
+ * log a warning and continue. We assume that source and destination
directories are on the same filesystem,
+ * so that move() can be used.
+ *
+ * @param fs
+ * @param sourceDir
+ * @param destDir
+ * @param overwrite
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ private void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean
overwrite) throws IOException, URISyntaxException {
+ for (String sourcePath : fs.listFiles(sourceDir, true)) {
+ URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath,
sourceDir);
+ String sourceFilename = FilenameUtils.getName(sourceFileUri.getPath());
+ URI destFileUri =
SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri,
destDir).resolve(sourceFilename);
+
+ if (!overwrite && fs.exists(destFileUri)) {
+ LOGGER.warn("Can't overwrite existing output segment tar file: {}",
destFileUri);
+ } else {
+ fs.move(sourceFileUri, destFileUri, true);
+ }
+ }
+ }
+
/**
* Can be overridden to plug in custom mapper.
*/
protected Class<? extends Mapper<LongWritable, Text, LongWritable, Text>>
getMapperClass() {
return HadoopSegmentCreationMapper.class;
}
- protected void packPluginsToDistributedCache(Job job) {
+ /**
+ * We have to put our jar (which contains the mapper) in the distributed
cache and add it to the classpath,
+ * as otherwise it's not available (since the pinot-all jar - which is
bigger - is what we've set as our job jar).
+ *
+ * @param job
+ * @param outputDirFS
+ * @param stagingDirURI
+ * @throws Exception
+ */
+ protected void addMapperJarToDistributedCache(Job job, PinotFS outputDirFS,
URI stagingDirURI) throws Exception {
+ File ourJar = new
File(getClass().getProtectionDomain().getCodeSource().getLocation().toURI());
+ Path distributedCacheJar = new Path(stagingDirURI.toString(),
ourJar.getName());
+ outputDirFS.copyFromLocalFile(ourJar, distributedCacheJar.toUri());
+ job.addFileToClassPath(distributedCacheJar);
+ }
+
+ protected void packPluginsToDistributedCache(Job job, PinotFS outputDirFS,
URI stagingDirURI) {
File pluginsRootDir = new File(PluginManager.get().getPluginsRootDir());
if (pluginsRootDir.exists()) {
- File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
try {
+ File pluginsTarGzFile = File.createTempFile("pinot-plugins-",
".tar.gz");
TarGzCompressionUtils.createTarGzFile(pluginsRootDir,
pluginsTarGzFile);
- } catch (IOException e) {
+
+ // Copy to staging directory
+ Path cachedPluginsTarball = new Path(stagingDirURI.toString(),
SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ);
+ outputDirFS.copyFromLocalFile(pluginsTarGzFile,
cachedPluginsTarball.toUri());
+ job.addCacheFile(cachedPluginsTarball.toUri());
+ } catch (Exception e) {
LOGGER.error("Failed to tar plugins directory", e);
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]