This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1e90f14 Fix bug when importing files with the same name in different
directories (#8337)
1e90f14 is described below
commit 1e90f141282e40f819de806920cc2a836e0e35ba
Author: Mark Needham <[email protected]>
AuthorDate: Tue Mar 22 18:35:23 2022 +0000
Fix bug when importing files with the same name in different directories
(#8337)
---
.../standalone/SegmentGenerationJobRunner.java | 8 ++-
.../standalone/SegmentGenerationJobRunnerTest.java | 84 ++++++++++++++++++++++
2 files changed, 91 insertions(+), 1 deletion(-)
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index 86480ec..7c84945 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -238,7 +238,7 @@ public class SegmentGenerationJobRunner implements
IngestionJobRunner {
FileUtils.forceMkdir(localOutputTempDir);
//copy input path to local
- File localInputDataFile = new File(localInputTempDir, new
File(inputFileURI.getPath()).getName());
+ File localInputDataFile = createLocalInputDateFile(inputFileURI,
localInputTempDir);
_inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile);
//create task spec
@@ -290,4 +290,10 @@ public class SegmentGenerationJobRunner implements
IngestionJobRunner {
}
});
}
+
+ private File createLocalInputDateFile(URI inputFileURI, File
localInputTempDir) {
+ String inputFileURIPath = inputFileURI.getPath();
+ File localInputFileDir = new File(localInputTempDir,
UUID.randomUUID().toString());
+ return new File(localInputFileDir, new File(inputFileURIPath).getName());
+ }
}
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
index 1f920c4..2e39c43 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
@@ -139,4 +139,88 @@ public class SegmentGenerationJobRunnerTest {
// FUTURE - validate contents of file?
}
+
+ @Test
+ public void testInputFilesWithSameNameInDifferentDirectories()
+ throws Exception {
+ File testDir =
Files.createTempDirectory("testSegmentGeneration-").toFile();
+ testDir.delete();
+ testDir.mkdirs();
+
+ File inputDir = new File(testDir, "input");
+ File inputSubDir1 = new File(inputDir, "2009");
+ File inputSubDir2 = new File(inputDir, "2010");
+ inputSubDir1.mkdirs();
+ inputSubDir2.mkdirs();
+
+ File inputFile1 = new File(inputSubDir1, "input.csv");
+ FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2",
"value1,1", "value2,2"));
+
+ File inputFile2 = new File(inputSubDir2, "input.csv");
+ FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2",
"value3,3", "value4,4"));
+
+ File outputDir = new File(testDir, "output");
+
+ // Set up schema file.
+ final String schemaName = "mySchema";
+ File schemaFile = new File(testDir, "schema");
+ Schema schema = new SchemaBuilder()
+ .setSchemaName(schemaName)
+ .addSingleValueDimension("col1", DataType.STRING)
+ .addMetric("col2", DataType.INT)
+ .build();
+ FileUtils.write(schemaFile, schema.toPrettyJsonString(),
StandardCharsets.UTF_8);
+
+ // Set up table config file.
+ File tableConfigFile = new File(testDir, "tableConfig");
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName("myTable")
+ .setSchemaName(schemaName)
+ .setNumReplicas(1)
+ .build();
+ FileUtils.write(tableConfigFile, tableConfig.toJsonString(),
StandardCharsets.UTF_8);
+
+ SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
+ jobSpec.setJobType("SegmentCreation");
+ jobSpec.setInputDirURI(inputDir.toURI().toString());
+ jobSpec.setOutputDirURI(outputDir.toURI().toString());
+ jobSpec.setOverwriteOutput(true);
+
+ RecordReaderSpec recordReaderSpec = new RecordReaderSpec();
+ recordReaderSpec.setDataFormat("csv");
+ recordReaderSpec.setClassName(CSVRecordReader.class.getName());
+ recordReaderSpec.setConfigClassName(CSVRecordReaderConfig.class.getName());
+ jobSpec.setRecordReaderSpec(recordReaderSpec);
+
+ TableSpec tableSpec = new TableSpec();
+ tableSpec.setTableName("myTable");
+ tableSpec.setSchemaURI(schemaFile.toURI().toString());
+ tableSpec.setTableConfigURI(tableConfigFile.toURI().toString());
+ jobSpec.setTableSpec(tableSpec);
+
+ ExecutionFrameworkSpec efSpec = new ExecutionFrameworkSpec();
+ efSpec.setName("standalone");
+
efSpec.setSegmentGenerationJobRunnerClassName(SegmentGenerationJobRunner.class.getName());
+ jobSpec.setExecutionFrameworkSpec(efSpec);
+
+ PinotFSSpec pfsSpec = new PinotFSSpec();
+ pfsSpec.setScheme("file");
+ pfsSpec.setClassName(LocalPinotFS.class.getName());
+ jobSpec.setPinotFSSpecs(Collections.singletonList(pfsSpec));
+
+ SegmentGenerationJobRunner jobRunner = new
SegmentGenerationJobRunner(jobSpec);
+ jobRunner.run();
+
+ // Check that both segment files are created
+
+ File newSegmentFile2009 = new File(outputDir,
"2009/myTable_OFFLINE_0.tar.gz");
+ Assert.assertTrue(newSegmentFile2009.exists());
+ Assert.assertTrue(newSegmentFile2009.isFile());
+ Assert.assertTrue(newSegmentFile2009.length() > 0);
+
+ File newSegmentFile2010 = new File(outputDir,
"2010/myTable_OFFLINE_0.tar.gz");
+ Assert.assertTrue(newSegmentFile2010.exists());
+ Assert.assertTrue(newSegmentFile2010.isFile());
+ Assert.assertTrue(newSegmentFile2010.length() > 0);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]