This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1d4a9cc Merge pull request #15298 from [BEAM-12729] Suppress Avro
Runtime Exceptions for Streaming
1d4a9cc is described below
commit 1d4a9ccd11c14ac6f0a2de1cc438a881244ede0a
Author: Dylan Hercher <[email protected]>
AuthorDate: Mon Aug 9 14:36:48 2021 -0700
Merge pull request #15298 from [BEAM-12729] Suppress Avro Runtime
Exceptions for Streaming
* add readerThreadCount variable to allow file reads to control threads
after the reshuffle occurs
* supply type inference
* spotless
* chain old constructor for backwards compat
* add nullable checker
* add testing of threads
* use valueof
* adding nullable to the var
* add supgress notifications logic
* add cast to null
* cast to integer
* linting
* test removing num buckets
* use numBuckets
* shift from threading to controlling if reshuffle is used
* spotless
* adding experimental
* rename second test
* fixing checkstyle errors
* spell check
* adding a fiel exception handler
Co-authored-by: Pablo <[email protected]>
---
.../main/java/org/apache/beam/sdk/io/AvroIO.java | 46 ++++++++++++++++++----
.../beam/sdk/io/ReadAllViaFileBasedSource.java | 41 +++++++++++++++++--
2 files changed, 75 insertions(+), 12 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 7c7f4c3..353d822 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
+import static
org.apache.beam.sdk.io.ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler;
import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
@@ -359,7 +360,8 @@ public class AvroIO {
.setSchema(ReflectData.get().getSchema(recordClass))
.setInferBeamSchema(false)
.setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
- .setUsesReshuffle(DEFAULT_USES_RESHUFFLE)
+ .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+ .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
.build();
}
@@ -402,7 +404,8 @@ public class AvroIO {
.setSchema(schema)
.setInferBeamSchema(false)
.setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
- .setUsesReshuffle(DEFAULT_USES_RESHUFFLE)
+ .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+ .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
.build();
}
@@ -474,7 +477,8 @@ public class AvroIO {
return new AutoValue_AvroIO_ParseFiles.Builder<T>()
.setParseFn(parseFn)
.setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
- .setUsesReshuffle(DEFAULT_USES_RESHUFFLE)
+ .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+ .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
.build();
}
@@ -582,9 +586,6 @@ public class AvroIO {
*/
private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;
- /** ReShuffle before avro file reads by default. */
- private static final boolean DEFAULT_USES_RESHUFFLE = true;
-
/** Implementation of {@link #read} and {@link #readGenericRecords}. */
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin,
PCollection<T>> {
@@ -761,6 +762,8 @@ public class AvroIO {
abstract boolean getUsesReshuffle();
+ abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler();
+
abstract long getDesiredBundleSizeBytes();
abstract boolean getInferBeamSchema();
@@ -777,6 +780,9 @@ public class AvroIO {
abstract Builder<T> setUsesReshuffle(boolean usesReshuffle);
+ abstract Builder<T> setFileExceptionHandler(
+ ReadFileRangesFnExceptionHandler exceptionHandler);
+
abstract Builder<T> setDesiredBundleSizeBytes(long
desiredBundleSizeBytes);
abstract Builder<T> setInferBeamSchema(boolean infer);
@@ -797,6 +803,13 @@ public class AvroIO {
return toBuilder().setUsesReshuffle(usesReshuffle).build();
}
+ /** Specifies if exceptions should be logged only for streaming pipelines.
*/
+ @Experimental(Kind.FILESYSTEM)
+ public ReadFiles<T> withFileExceptionHandler(
+ ReadFileRangesFnExceptionHandler exceptionHandler) {
+ return toBuilder().setFileExceptionHandler(exceptionHandler).build();
+ }
+
/**
* If set to true, a Beam schema will be inferred from the AVRO schema.
This allows the output
* to be used by SQL and by the schema-transform library.
@@ -821,7 +834,8 @@ public class AvroIO {
new CreateSourceFn<>(
getRecordClass(), getSchema().toString(),
getDatumReaderFactory()),
AvroCoder.of(getRecordClass(), getSchema()),
- getUsesReshuffle()));
+ getUsesReshuffle(),
+ getFileExceptionHandler()));
return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(),
getSchema()) : read;
}
@@ -1094,6 +1108,8 @@ public class AvroIO {
abstract boolean getUsesReshuffle();
+ abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler();
+
abstract long getDesiredBundleSizeBytes();
abstract Builder<T> toBuilder();
@@ -1106,6 +1122,9 @@ public class AvroIO {
abstract Builder<T> setUsesReshuffle(boolean usesReshuffle);
+ abstract Builder<T> setFileExceptionHandler(
+ ReadFileRangesFnExceptionHandler exceptionHandler);
+
abstract Builder<T> setDesiredBundleSizeBytes(long
desiredBundleSizeBytes);
abstract ParseFiles<T> build();
@@ -1122,6 +1141,13 @@ public class AvroIO {
return toBuilder().setUsesReshuffle(usesReshuffle).build();
}
+ /** Specifies if exceptions should be logged only for streaming pipelines.
*/
+ @Experimental(Kind.FILESYSTEM)
+ public ParseFiles<T> withFileExceptionHandler(
+ ReadFileRangesFnExceptionHandler exceptionHandler) {
+ return toBuilder().setFileExceptionHandler(exceptionHandler).build();
+ }
+
@VisibleForTesting
ParseFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
return
toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
@@ -1137,7 +1163,11 @@ public class AvroIO {
return input.apply(
"Parse Files via FileBasedSource",
new ReadAllViaFileBasedSource<>(
- getDesiredBundleSizeBytes(), createSource, coder,
getUsesReshuffle()));
+ getDesiredBundleSizeBytes(),
+ createSource,
+ coder,
+ getUsesReshuffle(),
+ getFileExceptionHandler()));
}
@Override
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
index 552396f..f21ba97 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
+import java.io.Serializable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
@@ -33,6 +34,8 @@ import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Reads each file in the input {@link PCollection} of {@link ReadableFile}
using given parameters
@@ -45,27 +48,38 @@ import org.apache.beam.sdk.values.PCollection;
@Experimental(Kind.SOURCE_SINK)
public class ReadAllViaFileBasedSource<T>
extends PTransform<PCollection<ReadableFile>, PCollection<T>> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadAllViaFileBasedSource.class);
+ protected static final boolean DEFAULT_USES_RESHUFFLE = true;
private final long desiredBundleSizeBytes;
private final SerializableFunction<String, ? extends FileBasedSource<T>>
createSource;
private final Coder<T> coder;
+ private final ReadFileRangesFnExceptionHandler exceptionHandler;
private final boolean usesReshuffle;
public ReadAllViaFileBasedSource(
long desiredBundleSizeBytes,
SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
Coder<T> coder) {
- this(desiredBundleSizeBytes, createSource, coder, true);
+ this(
+ desiredBundleSizeBytes,
+ createSource,
+ coder,
+ DEFAULT_USES_RESHUFFLE,
+ new ReadFileRangesFnExceptionHandler());
}
public ReadAllViaFileBasedSource(
long desiredBundleSizeBytes,
SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
Coder<T> coder,
- boolean usesReshuffle) {
+ boolean usesReshuffle,
+ ReadFileRangesFnExceptionHandler exceptionHandler) {
this.desiredBundleSizeBytes = desiredBundleSizeBytes;
this.createSource = createSource;
this.coder = coder;
this.usesReshuffle = usesReshuffle;
+ this.exceptionHandler = exceptionHandler;
}
@Override
@@ -76,7 +90,7 @@ public class ReadAllViaFileBasedSource<T>
ranges = ranges.apply("Reshuffle", Reshuffle.viaRandomKey());
}
return ranges
- .apply("Read ranges", ParDo.of(new ReadFileRangesFn<>(createSource)))
+ .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource,
exceptionHandler)))
.setCoder(coder);
}
@@ -103,10 +117,13 @@ public class ReadAllViaFileBasedSource<T>
private static class ReadFileRangesFn<T> extends DoFn<KV<ReadableFile,
OffsetRange>, T> {
private final SerializableFunction<String, ? extends FileBasedSource<T>>
createSource;
+ private final ReadFileRangesFnExceptionHandler exceptionHandler;
private ReadFileRangesFn(
- SerializableFunction<String, ? extends FileBasedSource<T>>
createSource) {
+ SerializableFunction<String, ? extends FileBasedSource<T>>
createSource,
+ ReadFileRangesFnExceptionHandler exceptionHandler) {
this.createSource = createSource;
+ this.exceptionHandler = exceptionHandler;
}
@ProcessElement
@@ -126,7 +143,23 @@ public class ReadAllViaFileBasedSource<T>
for (boolean more = reader.start(); more; more = reader.advance()) {
c.output(reader.getCurrent());
}
+ } catch (RuntimeException e) {
+ if (exceptionHandler.apply(file, range, e)) {
+ throw e;
+ }
}
}
}
+
+ /** A class to handle errors which occur during file reads. */
+ public static class ReadFileRangesFnExceptionHandler implements Serializable
{
+
+ /*
+ * Applies the desired handler logic to the given exception and returns
+ * if the exception should be thrown.
+ */
+ public boolean apply(ReadableFile file, OffsetRange range, Exception e) {
+ return true;
+ }
+ }
}