This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 129ec54302 Fix segment generation error handling (#8812)
129ec54302 is described below
commit 129ec543028da41c6e634b9bc4369924048082b2
Author: Ken Krugler <[email protected]>
AuthorDate: Wed Jun 8 10:38:19 2022 -0700
Fix segment generation error handling (#8812)
---
.../standalone/SegmentGenerationJobRunner.java | 37 +++-
.../standalone/SegmentGenerationJobRunnerTest.java | 187 ++++++++++++---------
.../v0_deprecated/pinot-spark/pom.xml | 16 +-
.../pinot-stream-ingestion/pinot-kinesis/pom.xml | 8 +
pom.xml | 9 +-
5 files changed, 172 insertions(+), 85 deletions(-)
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index 7c849451e0..83183de050 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -35,6 +35,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
@@ -70,6 +71,7 @@ public class SegmentGenerationJobRunner implements
IngestionJobRunner {
private CountDownLatch _segmentCreationTaskCountDownLatch;
private Schema _schema;
private TableConfig _tableConfig;
+ private AtomicReference<Exception> _failure;
public SegmentGenerationJobRunner() {
}
@@ -160,15 +162,22 @@ public class SegmentGenerationJobRunner implements
IngestionJobRunner {
LOGGER.info("Creating an executor service with {} threads(Job parallelism:
{}, available cores: {}.)", numThreads,
jobParallelism, Runtime.getRuntime().availableProcessors());
_executorService = Executors.newFixedThreadPool(numThreads);
+
+ // Currently we're only saving the first failure, as fast fail is
consistent with
+ // how the distributed batch (Hadoop/Spark) workflows act today.
+ _failure = new AtomicReference<>();
}
@Override
public void run()
throws Exception {
- //Get list of files to process
+ // Get list of files to process.
String[] files = _inputDirFS.listFiles(_inputDirURI, true);
- //TODO: sort input files based on creation time
+ // TODO - sort input files by modification timestamp. Though this is
problematic because:
+ // a. It can put more load on the external filesystem (e.g. S3), and
+ // b. The call to Collections.sort(siblingFiles) below will reorder files
by name.
+
List<String> filteredFiles = new ArrayList<>();
PathMatcher includeFilePathMatcher = null;
if (_spec.getIncludeFileNamePattern() != null) {
@@ -194,6 +203,7 @@ public class SegmentGenerationJobRunner implements
IngestionJobRunner {
filteredFiles.add(file);
}
}
+
File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" +
UUID.randomUUID());
try {
int numInputFiles = filteredFiles.size();
@@ -222,6 +232,11 @@ public class SegmentGenerationJobRunner implements
IngestionJobRunner {
}
}
_segmentCreationTaskCountDownLatch.await();
+
+ if (_failure.get() != null) {
+ _executorService.shutdownNow();
+ throw _failure.get();
+ }
} finally {
//clean up
FileUtils.deleteQuietly(localTempDir);
@@ -231,6 +246,7 @@ public class SegmentGenerationJobRunner implements
IngestionJobRunner {
private void submitSegmentGenTask(File localTempDir, URI inputFileURI, int
seqId)
throws Exception {
+
//create localTempDir for input and output
File localInputTempDir = new File(localTempDir, "input");
FileUtils.forceMkdir(localInputTempDir);
@@ -253,6 +269,14 @@ public class SegmentGenerationJobRunner implements
IngestionJobRunner {
taskSpec.setFailOnEmptySegment(_spec.isFailOnEmptySegment());
taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY,
inputFileURI.toString());
+ // If there's already been a failure, log and skip this file. Do this
check right before the
+ // submit to reduce odds of starting a new segment when a failure is
recorded right before the
+ // submit.
+ if (_failure.get() != null) {
+ LOGGER.info("Skipping Segment Generation Task for {} due to previous
failures", inputFileURI);
+ return;
+ }
+
LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
_executorService.submit(() -> {
File localSegmentDir = null;
@@ -281,7 +305,14 @@ public class SegmentGenerationJobRunner implements
IngestionJobRunner {
_outputDirFS.copyFromLocalFile(localSegmentTarFile,
outputSegmentTarURI);
}
} catch (Exception e) {
- LOGGER.error("Failed to generate Pinot segment for file - {}",
inputFileURI, e);
+ String msg = "Failed to generate Pinot segment for file - " +
inputFileURI.toString();
+ _failure.compareAndSet(null, new RuntimeException(msg, e));
+
+ // We have to decrement the latch by the number of pending tasks.
+ long count = _segmentCreationTaskCountDownLatch.getCount();
+ for (int i = 0; i < count; i++) {
+ _segmentCreationTaskCountDownLatch.countDown();
+ }
} finally {
_segmentCreationTaskCountDownLatch.countDown();
FileUtils.deleteQuietly(localSegmentDir);
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
index 2e39c43975..cd506fba2c 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/test/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.java
@@ -20,10 +20,13 @@ package org.apache.pinot.plugin.ingestion.batch.standalone;
import com.google.common.collect.Lists;
import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Collections;
import org.apache.commons.io.FileUtils;
+import
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
import org.apache.pinot.plugin.inputformat.csv.CSVRecordReader;
import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -36,11 +39,16 @@ import
org.apache.pinot.spi.ingestion.batch.spec.ExecutionFrameworkSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.RecordReaderSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
public class SegmentGenerationJobRunnerTest {
@Test
@@ -48,10 +56,7 @@ public class SegmentGenerationJobRunnerTest {
// TODO use common resource definitions & code shared with Hadoop unit
test.
// So probably need a pinot-batch-ingestion-common tests jar that we
depend on.
- File testDir =
Files.createTempDirectory("testSegmentGeneration-").toFile();
- testDir.delete();
- testDir.mkdirs();
-
+ File testDir = makeTestDir();
File inputDir = new File(testDir, "input");
inputDir.mkdirs();
File inputFile = new File(inputDir, "input.csv");
@@ -65,53 +70,11 @@ public class SegmentGenerationJobRunnerTest {
FileUtils.touch(new File(outputDir, outputFilename));
FileUtils.touch(new File(outputDir, existingFilename));
- // Set up schema file.
final String schemaName = "mySchema";
- File schemaFile = new File(testDir, "schema");
- Schema schema = new SchemaBuilder()
- .setSchemaName(schemaName)
- .addSingleValueDimension("col1", DataType.STRING)
- .addMetric("col2", DataType.INT)
- .build();
- FileUtils.write(schemaFile, schema.toPrettyJsonString(),
StandardCharsets.UTF_8);
-
- // Set up table config file.
- File tableConfigFile = new File(testDir, "tableConfig");
- TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
- .setTableName("myTable")
- .setSchemaName(schemaName)
- .setNumReplicas(1)
- .build();
- FileUtils.write(tableConfigFile, tableConfig.toJsonString(),
StandardCharsets.UTF_8);
-
- SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
- jobSpec.setJobType("SegmentCreation");
- jobSpec.setInputDirURI(inputDir.toURI().toString());
- jobSpec.setOutputDirURI(outputDir.toURI().toString());
+ File schemaFile = makeSchemaFile(testDir, schemaName);
+ File tableConfigFile = makeTableConfigFile(testDir, schemaName);
+ SegmentGenerationJobSpec jobSpec = makeJobSpec(inputDir, outputDir,
schemaFile, tableConfigFile);
jobSpec.setOverwriteOutput(false);
-
- RecordReaderSpec recordReaderSpec = new RecordReaderSpec();
- recordReaderSpec.setDataFormat("csv");
- recordReaderSpec.setClassName(CSVRecordReader.class.getName());
- recordReaderSpec.setConfigClassName(CSVRecordReaderConfig.class.getName());
- jobSpec.setRecordReaderSpec(recordReaderSpec);
-
- TableSpec tableSpec = new TableSpec();
- tableSpec.setTableName("myTable");
- tableSpec.setSchemaURI(schemaFile.toURI().toString());
- tableSpec.setTableConfigURI(tableConfigFile.toURI().toString());
- jobSpec.setTableSpec(tableSpec);
-
- ExecutionFrameworkSpec efSpec = new ExecutionFrameworkSpec();
- efSpec.setName("standalone");
-
efSpec.setSegmentGenerationJobRunnerClassName(SegmentGenerationJobRunner.class.getName());
- jobSpec.setExecutionFrameworkSpec(efSpec);
-
- PinotFSSpec pfsSpec = new PinotFSSpec();
- pfsSpec.setScheme("file");
- pfsSpec.setClassName(LocalPinotFS.class.getName());
- jobSpec.setPinotFSSpecs(Collections.singletonList(pfsSpec));
-
SegmentGenerationJobRunner jobRunner = new
SegmentGenerationJobRunner(jobSpec);
jobRunner.run();
@@ -143,10 +106,7 @@ public class SegmentGenerationJobRunnerTest {
@Test
public void testInputFilesWithSameNameInDifferentDirectories()
throws Exception {
- File testDir =
Files.createTempDirectory("testSegmentGeneration-").toFile();
- testDir.delete();
- testDir.mkdirs();
-
+ File testDir = makeTestDir();
File inputDir = new File(testDir, "input");
File inputSubDir1 = new File(inputDir, "2009");
File inputSubDir2 = new File(inputDir, "2010");
@@ -161,25 +121,111 @@ public class SegmentGenerationJobRunnerTest {
File outputDir = new File(testDir, "output");
- // Set up schema file.
final String schemaName = "mySchema";
+ File schemaFile = makeSchemaFile(testDir, schemaName);
+ File tableConfigFile = makeTableConfigFile(testDir, schemaName);
+ SegmentGenerationJobSpec jobSpec = makeJobSpec(inputDir, outputDir,
schemaFile, tableConfigFile);
+ SegmentGenerationJobRunner jobRunner = new
SegmentGenerationJobRunner(jobSpec);
+ jobRunner.run();
+
+ // Check that both segment files are created
+
+ File newSegmentFile2009 = new File(outputDir,
"2009/myTable_OFFLINE_0.tar.gz");
+ Assert.assertTrue(newSegmentFile2009.exists());
+ Assert.assertTrue(newSegmentFile2009.isFile());
+ Assert.assertTrue(newSegmentFile2009.length() > 0);
+
+ File newSegmentFile2010 = new File(outputDir,
"2010/myTable_OFFLINE_0.tar.gz");
+ Assert.assertTrue(newSegmentFile2010.exists());
+ Assert.assertTrue(newSegmentFile2010.isFile());
+ Assert.assertTrue(newSegmentFile2010.length() > 0);
+ }
+
+ @Test
+ public void testFailureHandling()
+ throws Exception {
+ File testDir = makeTestDir();
+ File inputDir = new File(testDir, "input");
+ inputDir.mkdirs();
+
+ File inputFile1 = new File(inputDir, "input1.csv");
+ FileUtils.writeLines(inputFile1, Lists.newArrayList("col1,col2",
"value11,11", "value12,12"));
+
+ File inputFile2 = new File(inputDir, "input2.csv");
+ FileUtils.writeLines(inputFile2, Lists.newArrayList("col1,col2",
"value21,notanint", "value22,22"));
+
+ File inputFile3 = new File(inputDir, "input3.csv");
+ FileUtils.writeLines(inputFile3, Lists.newArrayList("col1,col2",
"value31,31", "value32,32"));
+
+ File outputDir = new File(testDir, "output");
+
+ final String schemaName = "mySchema";
+ File schemaFile = makeSchemaFile(testDir, schemaName);
+ File tableConfigFile = makeTableConfigFile(testDir, schemaName);
+ SegmentGenerationJobSpec jobSpec = makeJobSpec(inputDir, outputDir,
schemaFile, tableConfigFile);
+
+ // Set up for a segment name that matches our input filename, so we can
validate
+ // that only the first input file gets processed.
+ SegmentNameGeneratorSpec nameSpec = new SegmentNameGeneratorSpec();
+
nameSpec.setType(SegmentGenerationTaskRunner.INPUT_FILE_SEGMENT_NAME_GENERATOR);
+ nameSpec.getConfigs().put(SegmentGenerationTaskRunner.FILE_PATH_PATTERN,
".+/(.+)\\.csv");
+
nameSpec.getConfigs().put(SegmentGenerationTaskRunner.SEGMENT_NAME_TEMPLATE,
"${filePathPattern:\\1}");
+ jobSpec.setSegmentNameGeneratorSpec(nameSpec);
+
+ try {
+ SegmentGenerationJobRunner jobRunner = new
SegmentGenerationJobRunner(jobSpec);
+ jobRunner.run();
+ fail("Job should have failed");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("input2.csv"), "Didn't find filename
in exception message");
+
+ // We should only have one output file, since segment generation will
+ // terminate after the second input file.
+ File[] segments = outputDir.listFiles(new FilenameFilter() {
+
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.endsWith(".tar.gz");
+ }
+ });
+
+ // We rely on the SegmentGenerationJobRunner doing a sort by name, so
"input1.csv" will be the
+ // first file we process, and "input2.csv" (the bad file) will be the
second one.
+ assertEquals(segments.length, 1);
+ assertTrue(segments[0].getName().endsWith("input1.tar.gz"));
+ }
+ }
+
+ private File makeTestDir() throws IOException {
+ File testDir =
Files.createTempDirectory("testSegmentGeneration-").toFile();
+ testDir.delete();
+ testDir.mkdirs();
+ return testDir;
+ }
+
+ private File makeSchemaFile(File testDir, String schemaName) throws
IOException {
File schemaFile = new File(testDir, "schema");
Schema schema = new SchemaBuilder()
- .setSchemaName(schemaName)
- .addSingleValueDimension("col1", DataType.STRING)
- .addMetric("col2", DataType.INT)
- .build();
+ .setSchemaName(schemaName)
+ .addSingleValueDimension("col1", DataType.STRING)
+ .addMetric("col2", DataType.INT)
+ .build();
FileUtils.write(schemaFile, schema.toPrettyJsonString(),
StandardCharsets.UTF_8);
+ return schemaFile;
+ }
- // Set up table config file.
+ private File makeTableConfigFile(File testDir, String schemaName) throws
IOException {
File tableConfigFile = new File(testDir, "tableConfig");
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
- .setTableName("myTable")
- .setSchemaName(schemaName)
- .setNumReplicas(1)
- .build();
+ .setTableName("myTable")
+ .setSchemaName(schemaName)
+ .setNumReplicas(1)
+ .build();
FileUtils.write(tableConfigFile, tableConfig.toJsonString(),
StandardCharsets.UTF_8);
+ return tableConfigFile;
+ }
+ private SegmentGenerationJobSpec makeJobSpec(File inputDir, File outputDir,
File schemaFile, File tableConfigFile) {
SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
jobSpec.setJobType("SegmentCreation");
jobSpec.setInputDirURI(inputDir.toURI().toString());
@@ -208,19 +254,6 @@ public class SegmentGenerationJobRunnerTest {
pfsSpec.setClassName(LocalPinotFS.class.getName());
jobSpec.setPinotFSSpecs(Collections.singletonList(pfsSpec));
- SegmentGenerationJobRunner jobRunner = new
SegmentGenerationJobRunner(jobSpec);
- jobRunner.run();
-
- // Check that both segment files are created
-
- File newSegmentFile2009 = new File(outputDir,
"2009/myTable_OFFLINE_0.tar.gz");
- Assert.assertTrue(newSegmentFile2009.exists());
- Assert.assertTrue(newSegmentFile2009.isFile());
- Assert.assertTrue(newSegmentFile2009.length() > 0);
-
- File newSegmentFile2010 = new File(outputDir,
"2010/myTable_OFFLINE_0.tar.gz");
- Assert.assertTrue(newSegmentFile2010.exists());
- Assert.assertTrue(newSegmentFile2010.isFile());
- Assert.assertTrue(newSegmentFile2010.length() > 0);
+ return jobSpec;
}
}
diff --git
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
index 8d15949d3a..bb8f269c3f 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
@@ -33,9 +33,9 @@
<url>https://pinot.apache.org/</url>
<properties>
<pinot.root>${basedir}/../../../..</pinot.root>
- <spark.version>2.4.0</spark.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala.version>2.11.11</scala.version>
+ <spark.version>2.4.7</spark.version>
+ <scala.binary.version>2.12</scala.binary.version>
+ <scala.version>2.12.15</scala.version>
<hadoop.version>2.10.1</hadoop.version>
<phase.prop>package</phase.prop>
</properties>
@@ -299,12 +299,20 @@
<dependency>
<groupId>com.holdenkarau</groupId>
<artifactId>spark-testing-base_${scala.binary.version}</artifactId>
- <version>${spark.version}_0.14.0</version>
+ <version>${spark.version}_1.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>*</artifactId>
</exclusion>
+ <!-- We have to explicitly exclude spark-yarn, due to a bug in the
current
+ version of the maven-enforcer-plugin, where it doesn't handle
+ wildcard exclusions in transitive dependencies. So projects that
+ depend on this project wind up not excluding spark-yarn. -->
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-yarn_${scala.binary.version}</artifactId>
+ </exclusion>
<exclusion>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 0528df3207..7d342c61e2 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -195,6 +195,14 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>*</artifactId>
</exclusion>
+ <!-- We have to explicitly exclude kinesis, due to a bug in the current
+ version of the maven-enforcer-plugin, where it doesn't handle
+ wildcard exclusions in transitive dependencies. So projects that
+ depend on this project wind up not excluding kinesis. -->
+ <exclusion>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>kinesis</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git a/pom.xml b/pom.xml
index bf3e96c7e1..b1cc8dd375 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1410,7 +1410,8 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
- <version>3.0.0-M2</version>
+ <!-- Note that 3.0.0, while newer, has bugs w/provided transitive
dependencies -->
+ <version>3.0.0-M3</version>
<executions>
<execution>
<id>enforce-dependency-convergence</id>
@@ -1644,6 +1645,7 @@
<exclude>**/*.avsc</exclude>
<exclude>**/*.csv</exclude>
<exclude>**/*.desc</exclude>
+ <exclude>**/*.fst</exclude>
<exclude>**/*.parquet</exclude>
<exclude>**/*.gz</exclude>
<exclude>**/*.orc</exclude>
@@ -1679,6 +1681,9 @@
<!-- files from Eclipse -->
<exclude>**/maven-eclipse.xml</exclude>
+ <exclude>**/.settings/**</exclude>
+ <exclude>**/.project</exclude>
+ <exclude>**/.classpath</exclude>
<exclude>.externalToolBuilders/**</exclude>
</excludes>
<mapping>
@@ -1752,6 +1757,7 @@
<exclude>**/*.avsc</exclude>
<exclude>**/*.csv</exclude>
<exclude>**/*.desc</exclude>
+ <exclude>**/*.fst</exclude>
<exclude>**/*.parquet</exclude>
<exclude>**/*.gz</exclude>
<exclude>**/*.orc</exclude>
@@ -1776,6 +1782,7 @@
<exclude>**/maven-eclipse.xml</exclude>
<exclude>**/.settings/**</exclude>
<exclude>**/.project</exclude>
+ <exclude>**/.classpath</exclude>
<exclude>.externalToolBuilders/**</exclude>
<!-- Docker and Kubernetes (not part of the distribution) -->
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]