This is an automated email from the ASF dual-hosted git repository.
wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dfe322094005 refactor: Remove not used classes from
`org.apache.hudi.spark.internal` (#18211)
dfe322094005 is described below
commit dfe322094005cd820699a7459ff8351bebccbb67
Author: Geser Dugarov <[email protected]>
AuthorDate: Tue Feb 17 09:04:19 2026 +0700
refactor: Remove not used classes from `org.apache.hudi.spark.internal`
(#18211)
---
.../apache/hudi/internal/BaseDefaultSource.java | 49 ---
.../hudi/internal/BaseWriterCommitMessage.java | 39 ---
.../internal/DataSourceInternalWriterHelper.java | 111 -------
.../apache/hudi/spark/internal/DefaultSource.java | 66 ----
.../HoodieBulkInsertDataInternalWriter.java | 79 -----
.../HoodieBulkInsertDataInternalWriterFactory.java | 57 ----
.../HoodieDataSourceInternalBatchWrite.java | 97 ------
.../HoodieDataSourceInternalBatchWriteBuilder.java | 64 ----
.../internal/HoodieDataSourceInternalTable.java | 87 ------
.../spark/internal/HoodieWriterCommitMessage.java | 37 ---
.../TestHoodieBulkInsertDataInternalWriter.java | 166 ----------
.../TestHoodieDataSourceInternalBatchWrite.java | 346 ---------------------
12 files changed, 1198 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseDefaultSource.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseDefaultSource.java
deleted file mode 100644
index 9d2bcec94385..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseDefaultSource.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.internal;
-
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StorageConfiguration;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.spark.sql.SparkSession;
-
-/**
- * Base class for DefaultSource used by Spark datasource v2.
- */
-public class BaseDefaultSource {
-
- protected SparkSession sparkSession = null;
- protected StorageConfiguration<Configuration> configuration = null;
-
- protected SparkSession getSparkSession() {
- if (sparkSession == null) {
- sparkSession = SparkSession.builder().getOrCreate();
- }
- return sparkSession;
- }
-
- protected StorageConfiguration<Configuration> getConfiguration() {
- if (configuration == null) {
- this.configuration = HadoopFSUtils.getStorageConf(
- getSparkSession().sparkContext().hadoopConfiguration());
- }
- return configuration;
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
deleted file mode 100644
index 1ec84269b6b7..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.internal;
-
-import org.apache.hudi.client.WriteStatus;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.ToString;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Base class for HoodieWriterCommitMessage used by Spark datasource v2.
- */
-@AllArgsConstructor
-@Getter
-@ToString
-public class BaseWriterCommitMessage implements Serializable {
-
- private final List<WriteStatus> writeStatuses;
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
deleted file mode 100644
index 902235a2acbd..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.internal;
-
-import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.client.TableWriteStats;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant.State;
-import org.apache.hudi.common.util.CommitUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.table.HoodieTable;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.types.StructType;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * Helper class for HoodieDataSourceInternalWriter used by Spark datasource v2.
- */
-@Slf4j
-public class DataSourceInternalWriterHelper {
-
- public static final String INSTANT_TIME_OPT_KEY = "hoodie.instant.time";
-
- private final String instantTime;
- private final HoodieTableMetaClient metaClient;
- private final SparkRDDWriteClient writeClient;
- @Getter
- private final HoodieTable hoodieTable;
- @Getter
- private final WriteOperationType writeOperationType;
- private final Map<String, String> extraMetadata;
-
- public DataSourceInternalWriterHelper(String instantTime, HoodieWriteConfig
writeConfig, StructType structType,
- SparkSession sparkSession,
StorageConfiguration<?> storageConf, Map<String, String> extraMetadata) {
- this.instantTime = instantTime;
- this.writeOperationType = WriteOperationType.BULK_INSERT;
- this.extraMetadata = extraMetadata;
- this.writeClient = new SparkRDDWriteClient<>(new
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())),
writeConfig);
- this.writeClient.setOperationType(writeOperationType);
- this.hoodieTable = this.writeClient.initTable(writeOperationType,
Option.of(instantTime));
-
- this.metaClient = HoodieTableMetaClient.builder()
-
.setConf(storageConf.newInstance()).setBasePath(writeConfig.getBasePath()).build();
-
this.writeClient.validateAgainstTableProperties(this.metaClient.getTableConfig(),
writeConfig);
- this.writeClient.preWrite(instantTime, WriteOperationType.BULK_INSERT,
metaClient);
- }
-
- public boolean useCommitCoordinator() {
- return true;
- }
-
- public void onDataWriterCommit(String message) {
- log.info("Received commit of a data writer = {}", message);
- }
-
- public void commit(List<WriteStatus> writeStatuses) {
- try {
- List<HoodieWriteStat> writeStatList =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
- writeClient.commitStats(instantTime, new TableWriteStats(writeStatList),
Option.of(extraMetadata),
- CommitUtils.getCommitActionType(writeOperationType,
metaClient.getTableType()), Collections.emptyMap(), Option.empty(),
- true, Option.empty());
- } catch (Exception ioe) {
- throw new HoodieException(ioe.getMessage(), ioe);
- } finally {
- writeClient.close();
- }
- }
-
- public void abort() {
- log.error("Commit {} aborted ", instantTime);
- writeClient.close();
- }
-
- public String createInflightCommit() {
- metaClient.getActiveTimeline().transitionRequestedToInflight(
- metaClient.createNewInstant(State.REQUESTED,
- CommitUtils.getCommitActionType(writeOperationType,
metaClient.getTableType()), instantTime), Option.empty());
- return instantTime;
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/DefaultSource.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/DefaultSource.java
deleted file mode 100644
index 1e98ba781e2b..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/DefaultSource.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.config.HoodieInternalConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.internal.BaseDefaultSource;
-import org.apache.hudi.internal.DataSourceInternalWriterHelper;
-
-import org.apache.spark.sql.HoodieDataTypeUtils;
-import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.catalog.TableProvider;
-import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * DataSource V2 implementation for managing internal write logic. Only called
internally.
- * This class is only compatible with datasource V2 API in Spark 3.
- */
-public class DefaultSource extends BaseDefaultSource implements TableProvider {
-
- @Override
- public StructType inferSchema(CaseInsensitiveStringMap options) {
- String jsonSchema =
options.get(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key());
- return HoodieDataTypeUtils.parseStructTypeFromJson(jsonSchema);
- }
-
- @Override
- public Table getTable(StructType schema, Transform[] partitioning,
Map<String, String> properties) {
- String instantTime =
properties.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY);
- String path = properties.get("path");
- String tblName = properties.get(HoodieWriteConfig.TBL_NAME.key());
- boolean populateMetaFields =
Boolean.parseBoolean(properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(),
-
Boolean.toString(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())));
- boolean arePartitionRecordsSorted =
Boolean.parseBoolean(properties.getOrDefault(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED,
-
Boolean.toString(HoodieInternalConfig.DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED)));
- // Create a new map as the properties is an unmodifiableMap on Spark 3.2.0
- Map<String, String> newProps = new HashMap<>(properties);
- // 1st arg to createHoodieConfig is not really required to be set. but
passing it anyways.
- HoodieWriteConfig config =
DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()),
path, tblName, newProps);
- return new HoodieDataSourceInternalTable(instantTime, config, schema,
getSparkSession(),
- getConfiguration(), newProps, populateMetaFields,
arePartitionRecordsSorted);
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieBulkInsertDataInternalWriter.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieBulkInsertDataInternalWriter.java
deleted file mode 100644
index ff0e00371d59..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieBulkInsertDataInternalWriter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.table.HoodieTable;
-import
org.apache.hudi.table.action.commit.BucketBulkInsertDataInternalWriterHelper;
-import org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper;
-import
org.apache.hudi.table.action.commit.ConsistentBucketBulkInsertDataInternalWriterHelper;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.write.DataWriter;
-import org.apache.spark.sql.connector.write.WriterCommitMessage;
-import org.apache.spark.sql.types.StructType;
-
-import java.io.IOException;
-
-/**
- * Hoodie's Implementation of {@link DataWriter<InternalRow>}. This is used in
data source "hudi.spark.internal" implementation for bulk insert.
- */
-public class HoodieBulkInsertDataInternalWriter implements
DataWriter<InternalRow> {
-
- private final BulkInsertDataInternalWriterHelper bulkInsertWriterHelper;
-
- public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable,
HoodieWriteConfig writeConfig,
- String instantTime, int
taskPartitionId, long taskId, StructType structType, boolean populateMetaFields,
- boolean arePartitionRecordsSorted)
{
-
- if (writeConfig.getIndexType() == HoodieIndex.IndexType.BUCKET) {
- if (writeConfig.getBucketIndexEngineType() ==
HoodieIndex.BucketIndexEngineType.SIMPLE) {
- this.bulkInsertWriterHelper = new
BucketBulkInsertDataInternalWriterHelper(hoodieTable,
- writeConfig, instantTime, taskPartitionId, taskId, 0, structType,
populateMetaFields, arePartitionRecordsSorted);
- } else {
- this.bulkInsertWriterHelper = new
ConsistentBucketBulkInsertDataInternalWriterHelper(hoodieTable,
- writeConfig, instantTime, taskPartitionId, taskId, 0, structType,
populateMetaFields, arePartitionRecordsSorted);
- }
- } else {
- this.bulkInsertWriterHelper = new
BulkInsertDataInternalWriterHelper(hoodieTable,
- writeConfig, instantTime, taskPartitionId, taskId, 0, structType,
populateMetaFields, arePartitionRecordsSorted);
- }
- }
-
- @Override
- public void write(InternalRow record) throws IOException {
- bulkInsertWriterHelper.write(record);
- }
-
- @Override
- public WriterCommitMessage commit() throws IOException {
- return new
HoodieWriterCommitMessage(bulkInsertWriterHelper.getWriteStatuses());
- }
-
- @Override
- public void abort() {
- bulkInsertWriterHelper.abort();
- }
-
- @Override
- public void close() throws IOException {
- bulkInsertWriterHelper.close();
- }
-}
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieBulkInsertDataInternalWriterFactory.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieBulkInsertDataInternalWriterFactory.java
deleted file mode 100644
index 7704b07b984e..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieBulkInsertDataInternalWriterFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.write.DataWriter;
-import org.apache.spark.sql.connector.write.DataWriterFactory;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * Factory to assist in instantiating {@link
HoodieBulkInsertDataInternalWriter}.
- */
-public class HoodieBulkInsertDataInternalWriterFactory implements
DataWriterFactory {
-
- private final String instantTime;
- private final HoodieTable hoodieTable;
- private final HoodieWriteConfig writeConfig;
- private final StructType structType;
- private final boolean populateMetaFields;
- private final boolean arePartitionRecordsSorted;
-
- public HoodieBulkInsertDataInternalWriterFactory(HoodieTable hoodieTable,
HoodieWriteConfig writeConfig,
- String instantTime,
StructType structType, boolean populateMetaFields,
- boolean
arePartitionRecordsSorted) {
- this.hoodieTable = hoodieTable;
- this.writeConfig = writeConfig;
- this.instantTime = instantTime;
- this.structType = structType;
- this.populateMetaFields = populateMetaFields;
- this.arePartitionRecordsSorted = arePartitionRecordsSorted;
- }
-
- @Override
- public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
- return new HoodieBulkInsertDataInternalWriter(hoodieTable, writeConfig,
instantTime, partitionId, taskId,
- structType, populateMetaFields, arePartitionRecordsSorted);
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWrite.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWrite.java
deleted file mode 100644
index 551cc340ed51..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWrite.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.internal.DataSourceInternalWriterHelper;
-import org.apache.hudi.storage.StorageConfiguration;
-
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.write.BatchWrite;
-import org.apache.spark.sql.connector.write.DataWriterFactory;
-import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
-import org.apache.spark.sql.connector.write.WriterCommitMessage;
-import org.apache.spark.sql.types.StructType;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * Implementation of {@link BatchWrite} for datasource "hudi.spark.internal"
to be used in datasource implementation
- * of bulk insert.
- */
-public class HoodieDataSourceInternalBatchWrite implements BatchWrite {
-
- private final HoodieWriteConfig writeConfig;
- private final StructType structType;
- private final boolean arePartitionRecordsSorted;
- private final boolean populateMetaFields;
- private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
- private Map<String, String> extraMetadata = new HashMap<>();
-
- public HoodieDataSourceInternalBatchWrite(String instantTime,
HoodieWriteConfig writeConfig, StructType structType,
- SparkSession jss,
StorageConfiguration<?> storageConf, Map<String, String> properties, boolean
populateMetaFields, boolean arePartitionRecordsSorted) {
- this.writeConfig = writeConfig;
- this.structType = structType;
- this.populateMetaFields = populateMetaFields;
- this.arePartitionRecordsSorted = arePartitionRecordsSorted;
- this.extraMetadata = DataSourceUtils.getExtraMetadata(properties);
- this.dataSourceInternalWriterHelper = new
DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
- jss, storageConf, extraMetadata);
- }
-
- @Override
- public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
- String instantTime = dataSourceInternalWriterHelper.createInflightCommit();
- if (WriteOperationType.BULK_INSERT ==
dataSourceInternalWriterHelper.getWriteOperationType()) {
- return new
HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
- writeConfig, instantTime, structType, populateMetaFields,
arePartitionRecordsSorted);
- } else {
- throw new IllegalArgumentException("Write Operation Type + " +
dataSourceInternalWriterHelper.getWriteOperationType() + " not supported ");
- }
- }
-
- @Override
- public boolean useCommitCoordinator() {
- return dataSourceInternalWriterHelper.useCommitCoordinator();
- }
-
- @Override
- public void onDataWriterCommit(WriterCommitMessage message) {
- dataSourceInternalWriterHelper.onDataWriterCommit(message.toString());
- }
-
- @Override
- public void commit(WriterCommitMessage[] messages) {
- List<WriteStatus> writeStatuses = Arrays.stream(messages).map(m ->
(HoodieWriterCommitMessage) m)
- .flatMap(m ->
m.getWriteStatuses().stream()).collect(Collectors.toList());
- dataSourceInternalWriterHelper.commit(writeStatuses);
- }
-
- @Override
- public void abort(WriterCommitMessage[] messages) {
- dataSourceInternalWriterHelper.abort();
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWriteBuilder.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWriteBuilder.java
deleted file mode 100644
index 204883036ac8..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWriteBuilder.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.storage.StorageConfiguration;
-
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.write.BatchWrite;
-import org.apache.spark.sql.connector.write.WriteBuilder;
-import org.apache.spark.sql.types.StructType;
-
-import java.util.Map;
-
-/**
- * Implementation of {@link WriteBuilder} for datasource "hudi.spark.internal"
to be used in datasource implementation
- * of bulk insert.
- */
-public class HoodieDataSourceInternalBatchWriteBuilder implements WriteBuilder
{
-
- private final String instantTime;
- private final HoodieWriteConfig writeConfig;
- private final StructType structType;
- private final SparkSession jss;
- private final StorageConfiguration<?> storageConf;
- private final Map<String, String> properties;
- private final boolean populateMetaFields;
- private final boolean arePartitionRecordsSorted;
-
- public HoodieDataSourceInternalBatchWriteBuilder(String instantTime,
HoodieWriteConfig writeConfig, StructType structType,
- SparkSession jss,
StorageConfiguration<?> storageConf, Map<String, String> properties, boolean
populateMetaFields,
- boolean
arePartitionRecordsSorted) {
- this.instantTime = instantTime;
- this.writeConfig = writeConfig;
- this.structType = structType;
- this.jss = jss;
- this.storageConf = storageConf;
- this.properties = properties;
- this.populateMetaFields = populateMetaFields;
- this.arePartitionRecordsSorted = arePartitionRecordsSorted;
- }
-
- @Override
- public BatchWrite buildForBatch() {
- return new HoodieDataSourceInternalBatchWrite(instantTime, writeConfig,
structType, jss,
- storageConf, properties, populateMetaFields,
arePartitionRecordsSorted);
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalTable.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalTable.java
deleted file mode 100644
index f1acffeb8575..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalTable.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.storage.StorageConfiguration;
-
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.catalog.SupportsWrite;
-import org.apache.spark.sql.connector.catalog.TableCapability;
-import org.apache.spark.sql.connector.write.LogicalWriteInfo;
-import org.apache.spark.sql.connector.write.WriteBuilder;
-import org.apache.spark.sql.types.StructType;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Hoodie's Implementation of {@link SupportsWrite}. This is used in data
source "hudi.spark.internal" implementation for bulk insert.
- */
-class HoodieDataSourceInternalTable implements SupportsWrite {
-
- private final String instantTime;
- private final HoodieWriteConfig writeConfig;
- private final StructType structType;
- private final SparkSession jss;
- private final StorageConfiguration<?> storageConf;
- private final boolean arePartitionRecordsSorted;
- private final Map<String, String> properties;
- private final boolean populateMetaFields;
-
- public HoodieDataSourceInternalTable(String instantTime, HoodieWriteConfig
config,
- StructType schema, SparkSession jss,
StorageConfiguration<?> storageConf, Map<String, String> properties,
- boolean populateMetaFields, boolean
arePartitionRecordsSorted) {
- this.instantTime = instantTime;
- this.writeConfig = config;
- this.structType = schema;
- this.jss = jss;
- this.storageConf = storageConf;
- this.properties = properties;
- this.populateMetaFields = populateMetaFields;
- this.arePartitionRecordsSorted = arePartitionRecordsSorted;
- }
-
- @Override
- public String name() {
- return this.getClass().toString();
- }
-
- @Override
- public StructType schema() {
- return structType;
- }
-
- @Override
- public Set<TableCapability> capabilities() {
- return new HashSet<TableCapability>() {
- {
- add(TableCapability.BATCH_WRITE);
- add(TableCapability.TRUNCATE);
- }
- };
- }
-
- @Override
- public WriteBuilder newWriteBuilder(LogicalWriteInfo logicalWriteInfo) {
- return new HoodieDataSourceInternalBatchWriteBuilder(instantTime,
writeConfig, structType, jss,
- storageConf, properties, populateMetaFields,
arePartitionRecordsSorted);
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieWriterCommitMessage.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieWriterCommitMessage.java
deleted file mode 100644
index a955d68accba..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieWriterCommitMessage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.internal.BaseWriterCommitMessage;
-
-import org.apache.spark.sql.connector.write.WriterCommitMessage;
-
-import java.util.List;
-
-/**
- * Hoodie's {@link WriterCommitMessage} used in datasource
"hudi.spark.internal" implementation.
- */
-public class HoodieWriterCommitMessage extends BaseWriterCommitMessage
- implements WriterCommitMessage {
-
- public HoodieWriterCommitMessage(List<WriteStatus> writeStatuses) {
- super(writeStatuses);
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java
deleted file mode 100644
index 200e37f65773..000000000000
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase;
-import org.apache.hudi.table.HoodieSparkTable;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Stream;
-
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
-import static
org.apache.hudi.testutils.SparkDatasetTestUtils.getInternalRowWithError;
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
-import static org.junit.jupiter.api.Assertions.fail;
-
-/**
- * Unit tests {@link HoodieBulkInsertDataInternalWriter}.
- */
-class TestHoodieBulkInsertDataInternalWriter extends
- HoodieBulkInsertInternalWriterTestBase {
-
- private static Stream<Arguments> configParams() {
- Object[][] data = new Object[][] {
- {true, true},
- {true, false},
- {false, true},
- {false, false}
- };
- return Stream.of(data).map(Arguments::of);
- }
-
- @ParameterizedTest
- @MethodSource("configParams")
- void testDataInternalWriter(boolean sorted, boolean populateMetaFields)
throws Exception {
- // init config and table
- HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
- HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
- // execute N rounds
- for (int i = 0; i < 2; i++) {
- String instantTime = "00" + i;
- // init writer
- HoodieBulkInsertDataInternalWriter writer = new
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime,
RANDOM.nextInt(100000),
- RANDOM.nextLong(), STRUCT_TYPE, populateMetaFields, sorted);
-
- int size = 10 + RANDOM.nextInt(1000);
- // write N rows to partition1, N rows to partition2 and N rows to
partition3 ... Each batch should create a new RowCreateHandle and a new file
- int batches = 3;
- Dataset<Row> totalInputRows = null;
-
- for (int j = 0; j < batches; j++) {
- String partitionPath =
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
- Dataset<Row> inputRows = getRandomRows(sqlContext, size,
partitionPath, false);
- writeRows(inputRows, writer);
- if (totalInputRows == null) {
- totalInputRows = inputRows;
- } else {
- totalInputRows = totalInputRows.union(inputRows);
- }
- }
-
- HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage)
writer.commit();
- Option<List<String>> fileAbsPaths = Option.of(new ArrayList<>());
- Option<List<String>> fileNames = Option.of(new ArrayList<>());
-
- // verify write statuses
- assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size,
sorted, fileAbsPaths, fileNames, false);
-
- // verify rows
- Dataset<Row> result =
sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
- assertOutput(totalInputRows, result, instantTime, fileNames,
populateMetaFields);
- }
- }
-
-
- /**
- * Issue some corrupted or wrong schematized InternalRow after few valid
InternalRows so that global error is thrown. write batch 1 of valid records
write batch2 of invalid records which is expected
- * to throw Global Error. Verify global error is set appropriately and only
first batch of records are written to disk.
- */
- @Test
- void testGlobalFailure() throws Exception {
- // init config and table
- HoodieWriteConfig cfg = getWriteConfig(true);
- HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
- String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
-
- String instantTime = "001";
- HoodieBulkInsertDataInternalWriter writer = new
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime,
RANDOM.nextInt(100000),
- RANDOM.nextLong(), STRUCT_TYPE, true, false);
-
- int size = 10 + RANDOM.nextInt(100);
- int totalFailures = 5;
- // Generate first batch of valid rows
- Dataset<Row> inputRows = getRandomRows(sqlContext, size / 2,
partitionPath, false);
- List<InternalRow> internalRows = toInternalRows(inputRows, ENCODER);
-
- // generate some failures rows
- for (int i = 0; i < totalFailures; i++) {
- internalRows.add(getInternalRowWithError(partitionPath));
- }
-
- // generate 2nd batch of valid rows
- Dataset<Row> inputRows2 = getRandomRows(sqlContext, size / 2,
partitionPath, false);
- internalRows.addAll(toInternalRows(inputRows2, ENCODER));
-
- // issue writes
- try {
- for (InternalRow internalRow : internalRows) {
- writer.write(internalRow);
- }
- fail("Should have failed");
- } catch (Throwable e) {
- // expected
- }
-
- HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage)
writer.commit();
-
- Option<List<String>> fileAbsPaths = Option.of(new ArrayList<>());
- Option<List<String>> fileNames = Option.of(new ArrayList<>());
- // verify write statuses
- assertWriteStatuses(commitMetadata.getWriteStatuses(), 1, size / 2,
fileAbsPaths, fileNames);
-
- // verify rows
- Dataset<Row> result =
sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
- assertOutput(inputRows, result, instantTime, fileNames, true);
- }
-
- private void writeRows(Dataset<Row> inputRows,
HoodieBulkInsertDataInternalWriter writer)
- throws Exception {
- List<InternalRow> internalRows = toInternalRows(inputRows, ENCODER);
- // issue writes
- for (InternalRow internalRow : internalRows) {
- writer.write(internalRow);
- }
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java
deleted file mode 100644
index 3bc5619ddfbf..000000000000
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
-
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.write.DataWriter;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Stream;
-
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Unit tests {@link HoodieDataSourceInternalBatchWrite}.
- */
-class TestHoodieDataSourceInternalBatchWrite extends
- HoodieBulkInsertInternalWriterTestBase {
-
- private static Stream<Arguments> bulkInsertTypeParams() {
- Object[][] data = new Object[][] {
- {true},
- {false}
- };
- return Stream.of(data).map(Arguments::of);
- }
-
- @ParameterizedTest
- @MethodSource("bulkInsertTypeParams")
- public void testDataSourceWriter(boolean populateMetaFields) throws
Exception {
- testDataSourceWriterInternal(Collections.emptyMap(),
Collections.emptyMap(), populateMetaFields);
- }
-
- private void testDataSourceWriterInternal(Map<String, String> extraMetadata,
Map<String, String> expectedExtraMetadata, boolean populateMetaFields) throws
Exception {
- // init config and table
- HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
- // init writer
- String instantTime;
- try (SparkRDDWriteClient<?> writeClient = new SparkRDDWriteClient<>(new
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())),
cfg)) {
- instantTime = writeClient.startCommit();
- }
-
- HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
- new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), storageConf, extraMetadata, populateMetaFields,
false);
- DataWriter<InternalRow> writer =
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0,
RANDOM.nextLong());
-
- String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
- List<String> partitionPathsAbs = new ArrayList<>();
- for (String partitionPath : partitionPaths) {
- partitionPathsAbs.add(basePath + "/" + partitionPath + "/*");
- }
-
- int size = 10 + RANDOM.nextInt(1000);
- int batches = 5;
- Dataset<Row> totalInputRows = null;
-
- for (int j = 0; j < batches; j++) {
- String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j
% 3];
- Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath,
false);
- writeRows(inputRows, writer);
- if (totalInputRows == null) {
- totalInputRows = inputRows;
- } else {
- totalInputRows = totalInputRows.union(inputRows);
- }
- }
-
- HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage)
writer.commit();
- List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
- commitMessages.add(commitMetadata);
- dataSourceInternalBatchWrite.commit(commitMessages.toArray(new
HoodieWriterCommitMessage[0]));
-
- metaClient.reloadActiveTimeline();
- Dataset<Row> result = HoodieClientTestUtils.read(
- jsc, basePath, sqlContext, metaClient.getStorage(),
partitionPathsAbs.toArray(new String[0]));
- // verify output
- assertOutput(totalInputRows, result, instantTime, Option.empty(),
populateMetaFields);
- assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches,
size, Option.empty(), Option.empty());
-
- // verify extra metadata
- Option<HoodieCommitMetadata> commitMetadataOption =
- HoodieClientTestUtils.getCommitMetadataForLatestInstant(metaClient);
- assertTrue(commitMetadataOption.isPresent());
- Map<String, String> actualExtraMetadata = new HashMap<>();
-
commitMetadataOption.get().getExtraMetadata().entrySet().stream().filter(entry
->
- !entry.getKey().equals(HoodieCommitMetadata.SCHEMA_KEY))
- .forEach(entry -> actualExtraMetadata.put(entry.getKey(),
entry.getValue()));
- assertEquals(actualExtraMetadata, expectedExtraMetadata);
- }
-
- @Test
- void testDataSourceWriterExtraCommitMetadata() throws Exception {
- String commitExtraMetaPrefix = "commit_extra_meta_";
- Map<String, String> extraMeta = new HashMap<>();
- extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key(),
commitExtraMetaPrefix);
- extraMeta.put(commitExtraMetaPrefix + "a", "valA");
- extraMeta.put(commitExtraMetaPrefix + "b", "valB");
- extraMeta.put("commit_extra_c", "valC"); // should not be part of commit
extra metadata
-
- Map<String, String> expectedMetadata = new HashMap<>();
- expectedMetadata.putAll(extraMeta);
-
expectedMetadata.remove(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key());
- expectedMetadata.remove("commit_extra_c");
-
- testDataSourceWriterInternal(extraMeta, expectedMetadata, true);
- }
-
- @Test
- void testDataSourceWriterEmptyExtraCommitMetadata() throws Exception {
- String commitExtraMetaPrefix = "commit_extra_meta_";
- Map<String, String> extraMeta = new HashMap<>();
- extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key(),
commitExtraMetaPrefix);
- extraMeta.put("keyA", "valA");
- extraMeta.put("keyB", "valB");
- extraMeta.put("commit_extra_c", "valC");
- // none of the keys has commit metadata key prefix.
- testDataSourceWriterInternal(extraMeta, Collections.emptyMap(), true);
- }
-
- @ParameterizedTest
- @MethodSource("bulkInsertTypeParams")
- public void testMultipleDataSourceWrites(boolean populateMetaFields) throws
Exception {
- // init config and table
- HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
- int partitionCounter = 0;
-
- // execute N rounds
- for (int i = 0; i < 2; i++) {
- String instantTime = createInstant(cfg);
- // init writer
- HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
- new HoodieDataSourceInternalBatchWrite(instantTime, cfg,
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.emptyMap(),
populateMetaFields, false);
- List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
- Dataset<Row> totalInputRows = null;
- DataWriter<InternalRow> writer =
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++,
RANDOM.nextLong());
-
- int size = 10 + RANDOM.nextInt(1000);
- int batches = 3; // one batch per partition
-
- for (int j = 0; j < batches; j++) {
- String partitionPath =
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
- Dataset<Row> inputRows = getRandomRows(sqlContext, size,
partitionPath, false);
- writeRows(inputRows, writer);
- if (totalInputRows == null) {
- totalInputRows = inputRows;
- } else {
- totalInputRows = totalInputRows.union(inputRows);
- }
- }
-
- HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage)
writer.commit();
- commitMessages.add(commitMetadata);
- dataSourceInternalBatchWrite.commit(commitMessages.toArray(new
HoodieWriterCommitMessage[0]));
- metaClient.reloadActiveTimeline();
-
- Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath,
sqlContext, metaClient.getCommitTimeline(), instantTime, populateMetaFields,
HoodieTestUtils.INSTANT_GENERATOR);
-
- // verify output
- assertOutput(totalInputRows, result, instantTime, Option.empty(),
populateMetaFields);
- assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches,
size, Option.empty(), Option.empty());
- }
- }
-
- // Large writes are not required to be executed w/ regular CI jobs. Takes
lot of running time.
- @Disabled
- @ParameterizedTest
- @MethodSource("bulkInsertTypeParams")
- public void testLargeWrites(boolean populateMetaFields) throws Exception {
- // init config and table
- HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
- int partitionCounter = 0;
-
- // execute N rounds
- for (int i = 0; i < 3; i++) {
- String instantTime = createInstant(cfg);
- // init writer
- HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
- new HoodieDataSourceInternalBatchWrite(instantTime, cfg,
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.emptyMap(),
populateMetaFields, false);
- List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
- Dataset<Row> totalInputRows = null;
- DataWriter<InternalRow> writer =
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++,
RANDOM.nextLong());
-
- int size = 10000 + RANDOM.nextInt(10000);
- int batches = 3; // one batch per partition
-
- for (int j = 0; j < batches; j++) {
- String partitionPath =
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
- Dataset<Row> inputRows = getRandomRows(sqlContext, size,
partitionPath, false);
- writeRows(inputRows, writer);
- if (totalInputRows == null) {
- totalInputRows = inputRows;
- } else {
- totalInputRows = totalInputRows.union(inputRows);
- }
- }
-
- HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage)
writer.commit();
- commitMessages.add(commitMetadata);
- dataSourceInternalBatchWrite.commit(commitMessages.toArray(new
HoodieWriterCommitMessage[0]));
- metaClient.reloadActiveTimeline();
-
- Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath,
sqlContext, metaClient.getCommitTimeline(), instantTime,
- populateMetaFields, HoodieTestUtils.INSTANT_GENERATOR);
-
- // verify output
- assertOutput(totalInputRows, result, instantTime, Option.empty(),
populateMetaFields);
- assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches,
size, Option.empty(), Option.empty());
- }
- }
-
- /**
- * Tests that DataSourceWriter.abort() will abort the written records of
interest write and commit batch1 write and abort batch2 Read of entire dataset
should show only records from batch1.
- * commit batch1
- * abort batch2
- * verify only records from batch1 is available to read
- */
- @ParameterizedTest
- @MethodSource("bulkInsertTypeParams")
- public void testAbort(boolean populateMetaFields) throws Exception {
- // init config and table
- HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
- String instantTime0 = createInstant(cfg);
- // init writer
- HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
- new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), storageConf, Collections.emptyMap(),
populateMetaFields, false);
- DataWriter<InternalRow> writer =
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0,
RANDOM.nextLong());
-
- List<String> partitionPaths =
Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
- List<String> partitionPathsAbs = new ArrayList<>();
- for (String partitionPath : partitionPaths) {
- partitionPathsAbs.add(basePath + "/" + partitionPath + "/*");
- }
-
- int size = 10 + RANDOM.nextInt(100);
- int batches = 1;
- Dataset<Row> totalInputRows = null;
-
- for (int j = 0; j < batches; j++) {
- String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j
% 3];
- Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath,
false);
- writeRows(inputRows, writer);
- if (totalInputRows == null) {
- totalInputRows = inputRows;
- } else {
- totalInputRows = totalInputRows.union(inputRows);
- }
- }
-
- HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage)
writer.commit();
- List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
- commitMessages.add(commitMetadata);
- // commit 1st batch
- dataSourceInternalBatchWrite.commit(commitMessages.toArray(new
HoodieWriterCommitMessage[0]));
- metaClient.reloadActiveTimeline();
- Dataset<Row> result = HoodieClientTestUtils.read(
- jsc, basePath, sqlContext, metaClient.getStorage(),
partitionPathsAbs.toArray(new String[0]));
- // verify rows
- assertOutput(totalInputRows, result, instantTime0, Option.empty(),
populateMetaFields);
- assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches,
size, Option.empty(), Option.empty());
-
- // 2nd batch. abort in the end
- String instantTime1 = createInstant(cfg);
- dataSourceInternalBatchWrite =
- new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), storageConf,
- Collections.emptyMap(), populateMetaFields, false);
- writer =
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(1,
RANDOM.nextLong());
-
- for (int j = 0; j < batches; j++) {
- String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j
% 3];
- Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath,
false);
- writeRows(inputRows, writer);
- }
-
- commitMetadata = (HoodieWriterCommitMessage) writer.commit();
- commitMessages = new ArrayList<>();
- commitMessages.add(commitMetadata);
- // commit 1st batch
- dataSourceInternalBatchWrite.abort(commitMessages.toArray(new
HoodieWriterCommitMessage[0]));
- metaClient.reloadActiveTimeline();
- result = HoodieClientTestUtils.read(
- jsc, basePath, sqlContext, metaClient.getStorage(),
partitionPathsAbs.toArray(new String[0]));
- // verify rows
- // only rows from first batch should be present
- assertOutput(totalInputRows, result, instantTime0, Option.empty(),
populateMetaFields);
- }
-
- private void writeRows(Dataset<Row> inputRows, DataWriter<InternalRow>
writer) throws Exception {
- List<InternalRow> internalRows = toInternalRows(inputRows, ENCODER);
- // issue writes
- for (InternalRow internalRow : internalRows) {
- writer.write(internalRow);
- }
- }
-
- private String createInstant(HoodieWriteConfig cfg) {
- String instantTime;
- try (SparkRDDWriteClient<?> writeClient = new SparkRDDWriteClient<>(new
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())),
cfg)) {
- instantTime = writeClient.startCommit();
- }
- return instantTime;
- }
-}