This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 8e79259136 Spark 3.3: Remove unused RowDataRewriter (#6343)
8e79259136 is described below
commit 8e79259136601e45bfb031edde21775dc2c0879e
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Dec 1 13:43:42 2022 -0800
Spark 3.3: Remove unused RowDataRewriter (#6343)
---
.../iceberg/spark/source/RowDataRewriter.java | 179 ---------------------
1 file changed, 179 deletions(-)
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
deleted file mode 100644
index b5022cc988..0000000000
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.spark.source;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.io.OutputFileFactory;
-import org.apache.iceberg.io.TaskWriter;
-import org.apache.iceberg.io.UnpartitionedWriter;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.spark.TaskContext;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RowDataRewriter implements Serializable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(RowDataRewriter.class);
-
- private final Broadcast<Table> tableBroadcast;
- private final PartitionSpec spec;
- private final FileFormat format;
- private final boolean caseSensitive;
-
- public RowDataRewriter(
- Broadcast<Table> tableBroadcast, PartitionSpec spec, boolean
caseSensitive) {
- this.tableBroadcast = tableBroadcast;
- this.spec = spec;
- this.caseSensitive = caseSensitive;
-
- Table table = tableBroadcast.value();
- String formatString =
- table
- .properties()
- .getOrDefault(
- TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
- this.format = FileFormat.fromString(formatString);
- }
-
- public List<DataFile> rewriteDataForTasks(JavaRDD<CombinedScanTask> taskRDD)
{
- JavaRDD<List<DataFile>> dataFilesRDD =
taskRDD.map(this::rewriteDataForTask);
-
- return
dataFilesRDD.collect().stream().flatMap(Collection::stream).collect(Collectors.toList());
- }
-
- private List<DataFile> rewriteDataForTask(CombinedScanTask task) throws
Exception {
- TaskContext context = TaskContext.get();
- int partitionId = context.partitionId();
- long taskId = context.taskAttemptId();
-
- Table table = tableBroadcast.value();
- Schema schema = table.schema();
- Map<String, String> properties = table.properties();
-
- RowDataReader dataReader = new RowDataReader(task, table, schema,
caseSensitive);
-
- StructType structType = SparkSchemaUtil.convert(schema);
- SparkAppenderFactory appenderFactory =
- SparkAppenderFactory.builderFor(table, schema,
structType).spec(spec).build();
- OutputFileFactory fileFactory =
- OutputFileFactory.builderFor(table, partitionId, taskId)
- .defaultSpec(spec)
- .format(format)
- .build();
-
- TaskWriter<InternalRow> writer;
- if (spec.isUnpartitioned()) {
- writer =
- new UnpartitionedWriter<>(
- spec, format, appenderFactory, fileFactory, table.io(),
Long.MAX_VALUE);
- } else if (PropertyUtil.propertyAsBoolean(
- properties,
- TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
- TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)) {
- writer =
- new SparkPartitionedFanoutWriter(
- spec,
- format,
- appenderFactory,
- fileFactory,
- table.io(),
- Long.MAX_VALUE,
- schema,
- structType);
- } else {
- writer =
- new SparkPartitionedWriter(
- spec,
- format,
- appenderFactory,
- fileFactory,
- table.io(),
- Long.MAX_VALUE,
- schema,
- structType);
- }
-
- try {
- while (dataReader.next()) {
- InternalRow row = dataReader.get();
- writer.write(row);
- }
-
- dataReader.close();
- dataReader = null;
-
- writer.close();
- return Lists.newArrayList(writer.dataFiles());
-
- } catch (Throwable originalThrowable) {
- try {
- LOG.error("Aborting task", originalThrowable);
- context.markTaskFailed(originalThrowable);
-
- LOG.error(
- "Aborting commit for partition {} (task {}, attempt {}, stage
{}.{})",
- partitionId,
- taskId,
- context.attemptNumber(),
- context.stageId(),
- context.stageAttemptNumber());
- if (dataReader != null) {
- dataReader.close();
- }
- writer.abort();
- LOG.error(
- "Aborted commit for partition {} (task {}, attempt {}, stage
{}.{})",
- partitionId,
- taskId,
- context.taskAttemptId(),
- context.stageId(),
- context.stageAttemptNumber());
-
- } catch (Throwable inner) {
- if (originalThrowable != inner) {
- originalThrowable.addSuppressed(inner);
- LOG.warn("Suppressing exception in catch: {}", inner.getMessage(),
inner);
- }
- }
-
- if (originalThrowable instanceof Exception) {
- throw originalThrowable;
- } else {
- throw new RuntimeException(originalThrowable);
- }
- }
- }
-}