ahmedabu98 commented on code in PR #38018:
URL: https://github.com/apache/beam/pull/38018#discussion_r3018246693
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AddFiles.java:
##########
@@ -227,101 +257,170 @@ public ConvertToDataFile(
private static final String UNKNOWN_FORMAT_ERROR = "Could not determine
the file's format";
static final String UNKNOWN_PARTITION_ERROR = "Could not determine the
file's partition: ";
- @ProcessElement
- public void process(@Element String filePath, MultiOutputReceiver output)
- throws IOException, InterruptedException {
- FileFormat format;
- try {
- format = inferFormat(filePath);
- } catch (UnknownFormatException e) {
- output
- .get(ERRORS)
- .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath,
UNKNOWN_FORMAT_ERROR).build());
- numErrorFiles.inc();
- return;
+ private static class ProcessResult {
+ final @Nullable SerializableDataFile dataFile;
+ final @Nullable Row errorRow;
+ final Instant timestamp;
+ final BoundedWindow window;
+
+ ProcessResult(
+ @Nullable SerializableDataFile dataFile,
+ @Nullable Row errorRow,
+ Instant timestamp,
+ BoundedWindow window) {
+ Preconditions.checkState(
+ dataFile == null || errorRow == null,
+ "Expected only one of dataFile or errorRow, but got both:%n\tfile:
%s%n\terror: %s",
+ dataFile != null ? dataFile.getPath() : null,
+ errorRow);
+ this.dataFile = dataFile;
+ this.errorRow = errorRow;
+ this.timestamp = timestamp;
+ this.window = window;
}
+ }
- if (table == null) {
- try {
- table = getOrCreateTable(filePath, format);
- } catch (FileNotFoundException e) {
- output
- .get(ERRORS)
- .output(
- Row.withSchema(ERROR_SCHEMA)
- .addValues(filePath, checkStateNotNull(e.getMessage()))
- .build());
- numErrorFiles.inc();
- return;
- }
- }
+ @Setup
+ public void setup() {
+ executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+ }
- // Check if the file path contains the provided prefix
- if (table.spec().isPartitioned()
- && !Strings.isNullOrEmpty(prefix)
- && !filePath.startsWith(checkStateNotNull(prefix))) {
- output
- .get(ERRORS)
- .output(Row.withSchema(ERROR_SCHEMA).addValues(filePath,
PREFIX_ERROR).build());
- numErrorFiles.inc();
- return;
+ @Teardown
+ public void teardown() {
+ if (executor != null) {
+ executor.shutdownNow();
}
+ }
- InputFile inputFile = table.io().newInputFile(filePath);
+ @StartBundle
+ public void startBundle() {
+ activeTasks = new ArrayList<>();
+ }
- Metrics metrics;
- try {
- metrics =
- getFileMetrics(
- inputFile,
- format,
- MetricsConfig.forTable(table),
- MappingUtil.create(table.schema()));
- } catch (FileNotFoundException e) {
- output
- .get(ERRORS)
- .output(
+ private Callable<ProcessResult> createProcessTask(
+ String filePath, Instant timestamp, BoundedWindow window) {
+
+ return () -> {
+ FileFormat format;
+ try {
+ format = inferFormat(filePath);
+ } catch (UnknownFormatException e) {
+ return new ProcessResult(
+ null,
+ Row.withSchema(ERROR_SCHEMA).addValues(filePath,
UNKNOWN_FORMAT_ERROR).build(),
+ timestamp,
+ window);
+ }
+
+ // Synchronize table initialization
+ if (table == null) {
+ try {
+ table = getOrCreateTable(filePath, format);
+ } catch (FileNotFoundException e) {
+ return new ProcessResult(
+ null,
Row.withSchema(ERROR_SCHEMA)
.addValues(filePath, checkStateNotNull(e.getMessage()))
- .build());
- numErrorFiles.inc();
- return;
- }
+ .build(),
+ timestamp,
+ window);
+ }
+ }
- // Figure out which partition this DataFile should go to
- String partitionPath;
- if (table.spec().isUnpartitioned()) {
- partitionPath = "";
- } else if (!Strings.isNullOrEmpty(prefix)) {
- // option 1: use directory structure to determine partition
- // Note: we don't validate the DataFile content here
- partitionPath = getPartitionFromFilePath(filePath);
- } else {
+ // Check if the file path contains the provided prefix
+ if (table.spec().isPartitioned()
+ && !Strings.isNullOrEmpty(prefix)
+ && !filePath.startsWith(checkStateNotNull(prefix))) {
+ return new ProcessResult(
+ null,
+ Row.withSchema(ERROR_SCHEMA).addValues(filePath,
PREFIX_ERROR).build(),
+ timestamp,
+ window);
+ }
+
+ InputFile inputFile = table.io().newInputFile(filePath);
+
+ Metrics metrics;
try {
- // option 2: examine DataFile min/max statistics to determine
partition
- partitionPath = getPartitionFromMetrics(metrics, inputFile, table);
- } catch (UnknownPartitionException e) {
- output
- .get(ERRORS)
- .output(
- Row.withSchema(ERROR_SCHEMA)
- .addValues(filePath, UNKNOWN_PARTITION_ERROR +
e.getMessage())
- .build());
- numErrorFiles.inc();
- return;
+ metrics =
+ getFileMetrics(
+ inputFile,
+ format,
+ MetricsConfig.forTable(table),
+ MappingUtil.create(table.schema()));
+ } catch (FileNotFoundException e) {
+ return new ProcessResult(
+ null,
+ Row.withSchema(ERROR_SCHEMA)
+ .addValues(filePath, checkStateNotNull(e.getMessage()))
+ .build(),
+ timestamp,
+ window);
}
- }
- DataFile df =
- DataFiles.builder(table.spec())
- .withPath(filePath)
- .withFormat(format)
- .withMetrics(metrics)
- .withFileSizeInBytes(inputFile.getLength())
- .withPartitionPath(partitionPath)
- .build();
+ // Figure out which partition this DataFile should go to
+ String partitionPath;
+ if (table.spec().isUnpartitioned()) {
+ partitionPath = "";
+ } else if (!Strings.isNullOrEmpty(prefix)) {
+ // option 1: use directory structure to determine partition
+ // Note: we don't validate the DataFile content here
+ partitionPath = getPartitionFromFilePath(filePath);
+ } else {
+ try {
+ // option 2: examine DataFile min/max statistics to determine
partition
+ partitionPath = getPartitionFromMetrics(metrics, inputFile, table);
+ } catch (UnknownPartitionException e) {
+ return new ProcessResult(
+ null,
+ Row.withSchema(ERROR_SCHEMA)
+ .addValues(filePath, UNKNOWN_PARTITION_ERROR +
e.getMessage())
+ .build(),
+ timestamp,
+ window);
+ }
+ }
+
+ DataFile df =
+ DataFiles.builder(table.spec())
+ .withPath(filePath)
+ .withFormat(format)
+ .withMetrics(metrics)
+ .withFileSizeInBytes(inputFile.getLength())
+ .withPartitionPath(partitionPath)
+ .build();
+
+ return new ProcessResult(
+ SerializableDataFile.from(df, partitionPath), null, timestamp,
window);
+ };
+ }
+
+ @ProcessElement
+ public void process(
+ @Element String filePath,
+ @Timestamp Instant timestamp,
+ BoundedWindow window,
+ MultiOutputReceiver output)
+ throws IOException, InterruptedException {
+
+ Callable<ProcessResult> task = createProcessTask(filePath, timestamp,
window);
+
checkStateNotNull(activeTasks).add(checkStateNotNull(executor).submit(task));
Review Comment:
Yeah if there's too many files in one bundle..
Added some backpressure logic to regulate this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]