robertwb commented on code in PR #29261:
URL: https://github.com/apache/beam/pull/29261#discussion_r1379446274
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -183,14 +188,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple
input) {
PCollectionTuple outputTuple =
kafkaValues.apply(
- ParDo.of(new ErrorFn("Kafka-read-error-counter",
valueMapper))
+ ParDo.of(new ErrorFn("Kafka-read-error-counter",
valueMapper, failOnError))
.withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
- return PCollectionRowTuple.of(
- "output",
- outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema),
- "errors",
- outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
+ PCollectionRowTuple outputRows = PCollectionRowTuple.of("output",
+ outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema));
+
+ PCollection<Row> errorOutput =
outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA);
+ if (!failOnError) {
Review Comment:
Hmm... Can we refactor this so as not to be so duplicative here?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -69,6 +67,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@SuppressWarnings({
Review Comment:
Can you put this more local to the issue?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -103,17 +107,22 @@ public static class ErrorCounterFn extends DoFn<Row,
KV<byte[], byte[]>> {
private SerializableFunction<Row, byte[]> toBytesFn;
private Counter errorCounter;
private Long errorsInBundle = 0L;
+ private final boolean failOnError;
- public ErrorCounterFn(String name, SerializableFunction<Row, byte[]>
toBytesFn) {
+ public ErrorCounterFn(String name, SerializableFunction<Row, byte[]>
toBytesFn, boolean failOnError) {
this.toBytesFn = toBytesFn;
- errorCounter =
Metrics.counter(KafkaWriteSchemaTransformProvider.class, name);
+ this.errorCounter =
Metrics.counter(KafkaWriteSchemaTransformProvider.class, name);
+ this.failOnError = failOnError;
}
@ProcessElement
public void process(@DoFn.Element Row row, MultiOutputReceiver receiver)
{
try {
receiver.get(OUTPUT_TAG).output(KV.of(new byte[1],
toBytesFn.apply(row)));
} catch (Exception e) {
Review Comment:
Same.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -262,17 +270,22 @@ public static class ErrorFn extends DoFn<byte[], Row> {
private SerializableFunction<byte[], Row> valueMapper;
private Counter errorCounter;
private Long errorsInBundle = 0L;
+ private final boolean failOnError;
- public ErrorFn(String name, SerializableFunction<byte[], Row> valueMapper)
{
+ public ErrorFn(String name, SerializableFunction<byte[], Row> valueMapper,
boolean failOnError) {
this.errorCounter =
Metrics.counter(KafkaReadSchemaTransformProvider.class, name);
this.valueMapper = valueMapper;
+ this.failOnError = failOnError;
}
@ProcessElement
public void process(@DoFn.Element byte[] msg, MultiOutputReceiver
receiver) {
try {
receiver.get(OUTPUT_TAG).output(valueMapper.apply(msg));
Review Comment:
Best practice is to not catch errors from `Reciever.output(...)` which may
involve arbitrary downstream fused operations. Instead, catch only the errors
from `valueMapper.apply(msg)` and output outside of the try block.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -183,14 +188,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple
input) {
PCollectionTuple outputTuple =
kafkaValues.apply(
- ParDo.of(new ErrorFn("Kafka-read-error-counter",
valueMapper))
+ ParDo.of(new ErrorFn("Kafka-read-error-counter",
valueMapper, failOnError))
.withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
- return PCollectionRowTuple.of(
- "output",
- outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema),
- "errors",
- outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
+ PCollectionRowTuple outputRows = PCollectionRowTuple.of("output",
+ outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema));
+
+ PCollection<Row> errorOutput =
outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA);
Review Comment:
Can we use `ErrorHandling.errorSchema(...)` here? (Likewise, below,
`ErrorHandling.errorRecord(...)`?)
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -56,6 +57,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@SuppressWarnings({
Review Comment:
Same.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -103,17 +107,22 @@ public static class ErrorCounterFn extends DoFn<Row,
KV<byte[], byte[]>> {
private SerializableFunction<Row, byte[]> toBytesFn;
private Counter errorCounter;
private Long errorsInBundle = 0L;
+ private final boolean failOnError;
- public ErrorCounterFn(String name, SerializableFunction<Row, byte[]>
toBytesFn) {
+ public ErrorCounterFn(String name, SerializableFunction<Row, byte[]>
toBytesFn, boolean failOnError) {
this.toBytesFn = toBytesFn;
- errorCounter =
Metrics.counter(KafkaWriteSchemaTransformProvider.class, name);
+ this.errorCounter =
Metrics.counter(KafkaWriteSchemaTransformProvider.class, name);
+ this.failOnError = failOnError;
}
@ProcessElement
public void process(@DoFn.Element Row row, MultiOutputReceiver receiver)
{
try {
receiver.get(OUTPUT_TAG).output(KV.of(new byte[1],
toBytesFn.apply(row)));
} catch (Exception e) {
+ if (failOnError) {
+ throw new RuntimeException(e.getMessage());
+ }
errorsInBundle += 1;
LOG.warn("Error while processing the element", e);
receiver
Review Comment:
Same comment about error schema.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -114,6 +115,7 @@ protected SchemaTransform
from(KafkaReadSchemaTransformConfiguration configurati
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
autoOffsetReset);
String format = configuration.getFormat();
+ boolean failOnError = configuration.getErrorHandling() == null;
Review Comment:
Use `!ErrorHandling.hasOutput(configuration.getErrorHandling())`. It's
possible errorHandling is set, but errorHandling.output is not. (This is also
more future compatible to other options that may be put here.)
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -103,17 +107,22 @@ public static class ErrorCounterFn extends DoFn<Row,
KV<byte[], byte[]>> {
private SerializableFunction<Row, byte[]> toBytesFn;
private Counter errorCounter;
private Long errorsInBundle = 0L;
+ private final boolean failOnError;
- public ErrorCounterFn(String name, SerializableFunction<Row, byte[]>
toBytesFn) {
+ public ErrorCounterFn(String name, SerializableFunction<Row, byte[]>
toBytesFn, boolean failOnError) {
this.toBytesFn = toBytesFn;
- errorCounter =
Metrics.counter(KafkaWriteSchemaTransformProvider.class, name);
+ this.errorCounter =
Metrics.counter(KafkaWriteSchemaTransformProvider.class, name);
+ this.failOnError = failOnError;
}
@ProcessElement
public void process(@DoFn.Element Row row, MultiOutputReceiver receiver)
{
try {
receiver.get(OUTPUT_TAG).output(KV.of(new byte[1],
toBytesFn.apply(row)));
} catch (Exception e) {
+ if (failOnError) {
+ throw new RuntimeException(e.getMessage());
Review Comment:
Same.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java:
##########
@@ -103,17 +107,22 @@ public static class ErrorCounterFn extends DoFn<Row,
KV<byte[], byte[]>> {
private SerializableFunction<Row, byte[]> toBytesFn;
private Counter errorCounter;
private Long errorsInBundle = 0L;
+ private final boolean failOnError;
- public ErrorCounterFn(String name, SerializableFunction<Row, byte[]>
toBytesFn) {
+ public ErrorCounterFn(String name, SerializableFunction<Row, byte[]>
toBytesFn, boolean failOnError) {
this.toBytesFn = toBytesFn;
- errorCounter =
Metrics.counter(KafkaWriteSchemaTransformProvider.class, name);
+ this.errorCounter =
Metrics.counter(KafkaWriteSchemaTransformProvider.class, name);
+ this.failOnError = failOnError;
}
@ProcessElement
public void process(@DoFn.Element Row row, MultiOutputReceiver receiver)
{
try {
receiver.get(OUTPUT_TAG).output(KV.of(new byte[1],
toBytesFn.apply(row)));
Review Comment:
Same.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java:
##########
@@ -262,17 +270,22 @@ public static class ErrorFn extends DoFn<byte[], Row> {
private SerializableFunction<byte[], Row> valueMapper;
private Counter errorCounter;
private Long errorsInBundle = 0L;
+ private final boolean failOnError;
- public ErrorFn(String name, SerializableFunction<byte[], Row> valueMapper)
{
+ public ErrorFn(String name, SerializableFunction<byte[], Row> valueMapper,
boolean failOnError) {
this.errorCounter =
Metrics.counter(KafkaReadSchemaTransformProvider.class, name);
this.valueMapper = valueMapper;
+ this.failOnError = failOnError;
}
@ProcessElement
public void process(@DoFn.Element byte[] msg, MultiOutputReceiver
receiver) {
try {
receiver.get(OUTPUT_TAG).output(valueMapper.apply(msg));
} catch (Exception e) {
+ if (failOnError) {
+ throw new RuntimeException(e.getMessage());
Review Comment:
This will lose the original context. You can throw new RuntimeException(e).
Perhaps better would be to catch RuntimeException in a separate clause and
throw the original.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]