This is an automated email from the ASF dual-hosted git repository.

wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dfe322094005 refactor: Remove not used classes from 
`org.apache.hudi.spark.internal` (#18211)
dfe322094005 is described below

commit dfe322094005cd820699a7459ff8351bebccbb67
Author: Geser Dugarov <[email protected]>
AuthorDate: Tue Feb 17 09:04:19 2026 +0700

    refactor: Remove not used classes from `org.apache.hudi.spark.internal` 
(#18211)
---
 .../apache/hudi/internal/BaseDefaultSource.java    |  49 ---
 .../hudi/internal/BaseWriterCommitMessage.java     |  39 ---
 .../internal/DataSourceInternalWriterHelper.java   | 111 -------
 .../apache/hudi/spark/internal/DefaultSource.java  |  66 ----
 .../HoodieBulkInsertDataInternalWriter.java        |  79 -----
 .../HoodieBulkInsertDataInternalWriterFactory.java |  57 ----
 .../HoodieDataSourceInternalBatchWrite.java        |  97 ------
 .../HoodieDataSourceInternalBatchWriteBuilder.java |  64 ----
 .../internal/HoodieDataSourceInternalTable.java    |  87 ------
 .../spark/internal/HoodieWriterCommitMessage.java  |  37 ---
 .../TestHoodieBulkInsertDataInternalWriter.java    | 166 ----------
 .../TestHoodieDataSourceInternalBatchWrite.java    | 346 ---------------------
 12 files changed, 1198 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseDefaultSource.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseDefaultSource.java
deleted file mode 100644
index 9d2bcec94385..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseDefaultSource.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.internal;
-
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.storage.StorageConfiguration;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.spark.sql.SparkSession;
-
-/**
- * Base class for DefaultSource used by Spark datasource v2.
- */
-public class BaseDefaultSource {
-
-  protected SparkSession sparkSession = null;
-  protected StorageConfiguration<Configuration> configuration = null;
-
-  protected SparkSession getSparkSession() {
-    if (sparkSession == null) {
-      sparkSession = SparkSession.builder().getOrCreate();
-    }
-    return sparkSession;
-  }
-
-  protected StorageConfiguration<Configuration> getConfiguration() {
-    if (configuration == null) {
-      this.configuration = HadoopFSUtils.getStorageConf(
-          getSparkSession().sparkContext().hadoopConfiguration());
-    }
-    return configuration;
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
deleted file mode 100644
index 1ec84269b6b7..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BaseWriterCommitMessage.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.internal;
-
-import org.apache.hudi.client.WriteStatus;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.ToString;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Base class for HoodieWriterCommitMessage used by Spark datasource v2.
- */
-@AllArgsConstructor
-@Getter
-@ToString
-public class BaseWriterCommitMessage implements Serializable {
-
-  private final List<WriteStatus> writeStatuses;
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
deleted file mode 100644
index 902235a2acbd..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.internal;
-
-import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.client.TableWriteStats;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant.State;
-import org.apache.hudi.common.util.CommitUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.table.HoodieTable;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.types.StructType;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * Helper class for HoodieDataSourceInternalWriter used by Spark datasource v2.
- */
-@Slf4j
-public class DataSourceInternalWriterHelper {
-
-  public static final String INSTANT_TIME_OPT_KEY = "hoodie.instant.time";
-
-  private final String instantTime;
-  private final HoodieTableMetaClient metaClient;
-  private final SparkRDDWriteClient writeClient;
-  @Getter
-  private final HoodieTable hoodieTable;
-  @Getter
-  private final WriteOperationType writeOperationType;
-  private final Map<String, String> extraMetadata;
-
-  public DataSourceInternalWriterHelper(String instantTime, HoodieWriteConfig 
writeConfig, StructType structType,
-                                        SparkSession sparkSession, 
StorageConfiguration<?> storageConf, Map<String, String> extraMetadata) {
-    this.instantTime = instantTime;
-    this.writeOperationType = WriteOperationType.BULK_INSERT;
-    this.extraMetadata = extraMetadata;
-    this.writeClient = new SparkRDDWriteClient<>(new 
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), 
writeConfig);
-    this.writeClient.setOperationType(writeOperationType);
-    this.hoodieTable = this.writeClient.initTable(writeOperationType, 
Option.of(instantTime));
-
-    this.metaClient = HoodieTableMetaClient.builder()
-        
.setConf(storageConf.newInstance()).setBasePath(writeConfig.getBasePath()).build();
-    
this.writeClient.validateAgainstTableProperties(this.metaClient.getTableConfig(),
 writeConfig);
-    this.writeClient.preWrite(instantTime, WriteOperationType.BULK_INSERT, 
metaClient);
-  }
-
-  public boolean useCommitCoordinator() {
-    return true;
-  }
-
-  public void onDataWriterCommit(String message) {
-    log.info("Received commit of a data writer = {}", message);
-  }
-
-  public void commit(List<WriteStatus> writeStatuses) {
-    try {
-      List<HoodieWriteStat> writeStatList = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
-      writeClient.commitStats(instantTime, new TableWriteStats(writeStatList), 
Option.of(extraMetadata),
-          CommitUtils.getCommitActionType(writeOperationType, 
metaClient.getTableType()), Collections.emptyMap(), Option.empty(),
-          true, Option.empty());
-    } catch (Exception ioe) {
-      throw new HoodieException(ioe.getMessage(), ioe);
-    } finally {
-      writeClient.close();
-    }
-  }
-
-  public void abort() {
-    log.error("Commit {} aborted ", instantTime);
-    writeClient.close();
-  }
-
-  public String createInflightCommit() {
-    metaClient.getActiveTimeline().transitionRequestedToInflight(
-        metaClient.createNewInstant(State.REQUESTED,
-            CommitUtils.getCommitActionType(writeOperationType, 
metaClient.getTableType()), instantTime), Option.empty());
-    return instantTime;
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/DefaultSource.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/DefaultSource.java
deleted file mode 100644
index 1e98ba781e2b..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/DefaultSource.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.config.HoodieInternalConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.internal.BaseDefaultSource;
-import org.apache.hudi.internal.DataSourceInternalWriterHelper;
-
-import org.apache.spark.sql.HoodieDataTypeUtils;
-import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.catalog.TableProvider;
-import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * DataSource V2 implementation for managing internal write logic. Only called 
internally.
- * This class is only compatible with datasource V2 API in Spark 3.
- */
-public class DefaultSource extends BaseDefaultSource implements TableProvider {
-
-  @Override
-  public StructType inferSchema(CaseInsensitiveStringMap options) {
-    String jsonSchema = 
options.get(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key());
-    return HoodieDataTypeUtils.parseStructTypeFromJson(jsonSchema);
-  }
-
-  @Override
-  public Table getTable(StructType schema, Transform[] partitioning, 
Map<String, String> properties) {
-    String instantTime = 
properties.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY);
-    String path = properties.get("path");
-    String tblName = properties.get(HoodieWriteConfig.TBL_NAME.key());
-    boolean populateMetaFields = 
Boolean.parseBoolean(properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(),
-        
Boolean.toString(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())));
-    boolean arePartitionRecordsSorted = 
Boolean.parseBoolean(properties.getOrDefault(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED,
-        
Boolean.toString(HoodieInternalConfig.DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED)));
-    // Create a new map as the properties is an unmodifiableMap on Spark 3.2.0
-    Map<String, String> newProps = new HashMap<>(properties);
-    // 1st arg to createHoodieConfig is not really required to be set. but 
passing it anyways.
-    HoodieWriteConfig config = 
DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()),
 path, tblName, newProps);
-    return new HoodieDataSourceInternalTable(instantTime, config, schema, 
getSparkSession(),
-        getConfiguration(), newProps, populateMetaFields, 
arePartitionRecordsSorted);
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieBulkInsertDataInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieBulkInsertDataInternalWriter.java
deleted file mode 100644
index ff0e00371d59..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieBulkInsertDataInternalWriter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.table.HoodieTable;
-import 
org.apache.hudi.table.action.commit.BucketBulkInsertDataInternalWriterHelper;
-import org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper;
-import 
org.apache.hudi.table.action.commit.ConsistentBucketBulkInsertDataInternalWriterHelper;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.write.DataWriter;
-import org.apache.spark.sql.connector.write.WriterCommitMessage;
-import org.apache.spark.sql.types.StructType;
-
-import java.io.IOException;
-
-/**
- * Hoodie's Implementation of {@link DataWriter<InternalRow>}. This is used in 
data source "hudi.spark.internal" implementation for bulk insert.
- */
-public class HoodieBulkInsertDataInternalWriter implements 
DataWriter<InternalRow> {
-
-  private final BulkInsertDataInternalWriterHelper bulkInsertWriterHelper;
-
-  public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
-                                            String instantTime, int 
taskPartitionId, long taskId, StructType structType, boolean populateMetaFields,
-                                            boolean arePartitionRecordsSorted) 
{
-
-    if (writeConfig.getIndexType() == HoodieIndex.IndexType.BUCKET) {
-      if (writeConfig.getBucketIndexEngineType() == 
HoodieIndex.BucketIndexEngineType.SIMPLE) {
-        this.bulkInsertWriterHelper = new 
BucketBulkInsertDataInternalWriterHelper(hoodieTable,
-            writeConfig, instantTime, taskPartitionId, taskId, 0, structType, 
populateMetaFields, arePartitionRecordsSorted);
-      } else {
-        this.bulkInsertWriterHelper = new 
ConsistentBucketBulkInsertDataInternalWriterHelper(hoodieTable,
-            writeConfig, instantTime, taskPartitionId, taskId, 0, structType, 
populateMetaFields, arePartitionRecordsSorted);
-      }
-    } else {
-      this.bulkInsertWriterHelper = new 
BulkInsertDataInternalWriterHelper(hoodieTable,
-          writeConfig, instantTime, taskPartitionId, taskId, 0, structType, 
populateMetaFields, arePartitionRecordsSorted);
-    }
-  }
-
-  @Override
-  public void write(InternalRow record) throws IOException {
-    bulkInsertWriterHelper.write(record);
-  }
-
-  @Override
-  public WriterCommitMessage commit() throws IOException {
-    return new 
HoodieWriterCommitMessage(bulkInsertWriterHelper.getWriteStatuses());
-  }
-
-  @Override
-  public void abort() {
-    bulkInsertWriterHelper.abort();
-  }
-
-  @Override
-  public void close() throws IOException {
-    bulkInsertWriterHelper.close();
-  }
-}
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieBulkInsertDataInternalWriterFactory.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieBulkInsertDataInternalWriterFactory.java
deleted file mode 100644
index 7704b07b984e..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieBulkInsertDataInternalWriterFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.write.DataWriter;
-import org.apache.spark.sql.connector.write.DataWriterFactory;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * Factory to assist in instantiating {@link 
HoodieBulkInsertDataInternalWriter}.
- */
-public class HoodieBulkInsertDataInternalWriterFactory implements 
DataWriterFactory {
-
-  private final String instantTime;
-  private final HoodieTable hoodieTable;
-  private final HoodieWriteConfig writeConfig;
-  private final StructType structType;
-  private final boolean populateMetaFields;
-  private final boolean arePartitionRecordsSorted;
-
-  public HoodieBulkInsertDataInternalWriterFactory(HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
-                                                   String instantTime, 
StructType structType, boolean populateMetaFields,
-                                                   boolean 
arePartitionRecordsSorted) {
-    this.hoodieTable = hoodieTable;
-    this.writeConfig = writeConfig;
-    this.instantTime = instantTime;
-    this.structType = structType;
-    this.populateMetaFields = populateMetaFields;
-    this.arePartitionRecordsSorted = arePartitionRecordsSorted;
-  }
-
-  @Override
-  public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
-    return new HoodieBulkInsertDataInternalWriter(hoodieTable, writeConfig, 
instantTime, partitionId, taskId,
-        structType, populateMetaFields, arePartitionRecordsSorted);
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWrite.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWrite.java
deleted file mode 100644
index 551cc340ed51..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWrite.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.internal.DataSourceInternalWriterHelper;
-import org.apache.hudi.storage.StorageConfiguration;
-
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.write.BatchWrite;
-import org.apache.spark.sql.connector.write.DataWriterFactory;
-import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
-import org.apache.spark.sql.connector.write.WriterCommitMessage;
-import org.apache.spark.sql.types.StructType;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * Implementation of {@link BatchWrite} for datasource "hudi.spark.internal" 
to be used in datasource implementation
- * of bulk insert.
- */
-public class HoodieDataSourceInternalBatchWrite implements BatchWrite {
-
-  private final HoodieWriteConfig writeConfig;
-  private final StructType structType;
-  private final boolean arePartitionRecordsSorted;
-  private final boolean populateMetaFields;
-  private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
-  private Map<String, String> extraMetadata = new HashMap<>();
-
-  public HoodieDataSourceInternalBatchWrite(String instantTime, 
HoodieWriteConfig writeConfig, StructType structType,
-                                            SparkSession jss, 
StorageConfiguration<?> storageConf, Map<String, String> properties, boolean 
populateMetaFields, boolean arePartitionRecordsSorted) {
-    this.writeConfig = writeConfig;
-    this.structType = structType;
-    this.populateMetaFields = populateMetaFields;
-    this.arePartitionRecordsSorted = arePartitionRecordsSorted;
-    this.extraMetadata = DataSourceUtils.getExtraMetadata(properties);
-    this.dataSourceInternalWriterHelper = new 
DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
-        jss, storageConf, extraMetadata);
-  }
-
-  @Override
-  public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
-    String instantTime = dataSourceInternalWriterHelper.createInflightCommit();
-    if (WriteOperationType.BULK_INSERT == 
dataSourceInternalWriterHelper.getWriteOperationType()) {
-      return new 
HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
-          writeConfig, instantTime, structType, populateMetaFields, 
arePartitionRecordsSorted);
-    } else {
-      throw new IllegalArgumentException("Write Operation Type + " + 
dataSourceInternalWriterHelper.getWriteOperationType() + " not supported ");
-    }
-  }
-
-  @Override
-  public boolean useCommitCoordinator() {
-    return dataSourceInternalWriterHelper.useCommitCoordinator();
-  }
-
-  @Override
-  public void onDataWriterCommit(WriterCommitMessage message) {
-    dataSourceInternalWriterHelper.onDataWriterCommit(message.toString());
-  }
-
-  @Override
-  public void commit(WriterCommitMessage[] messages) {
-    List<WriteStatus> writeStatuses = Arrays.stream(messages).map(m -> 
(HoodieWriterCommitMessage) m)
-        .flatMap(m -> 
m.getWriteStatuses().stream()).collect(Collectors.toList());
-    dataSourceInternalWriterHelper.commit(writeStatuses);
-  }
-
-  @Override
-  public void abort(WriterCommitMessage[] messages) {
-    dataSourceInternalWriterHelper.abort();
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWriteBuilder.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWriteBuilder.java
deleted file mode 100644
index 204883036ac8..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWriteBuilder.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.storage.StorageConfiguration;
-
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.write.BatchWrite;
-import org.apache.spark.sql.connector.write.WriteBuilder;
-import org.apache.spark.sql.types.StructType;
-
-import java.util.Map;
-
-/**
- * Implementation of {@link WriteBuilder} for datasource "hudi.spark.internal" 
to be used in datasource implementation
- * of bulk insert.
- */
-public class HoodieDataSourceInternalBatchWriteBuilder implements WriteBuilder 
{
-
-  private final String instantTime;
-  private final HoodieWriteConfig writeConfig;
-  private final StructType structType;
-  private final SparkSession jss;
-  private final StorageConfiguration<?> storageConf;
-  private final Map<String, String> properties;
-  private final boolean populateMetaFields;
-  private final boolean arePartitionRecordsSorted;
-
-  public HoodieDataSourceInternalBatchWriteBuilder(String instantTime, 
HoodieWriteConfig writeConfig, StructType structType,
-                                                   SparkSession jss, 
StorageConfiguration<?> storageConf, Map<String, String> properties, boolean 
populateMetaFields,
-                                                   boolean 
arePartitionRecordsSorted) {
-    this.instantTime = instantTime;
-    this.writeConfig = writeConfig;
-    this.structType = structType;
-    this.jss = jss;
-    this.storageConf = storageConf;
-    this.properties = properties;
-    this.populateMetaFields = populateMetaFields;
-    this.arePartitionRecordsSorted = arePartitionRecordsSorted;
-  }
-
-  @Override
-  public BatchWrite buildForBatch() {
-    return new HoodieDataSourceInternalBatchWrite(instantTime, writeConfig, 
structType, jss,
-        storageConf, properties, populateMetaFields, 
arePartitionRecordsSorted);
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalTable.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalTable.java
deleted file mode 100644
index f1acffeb8575..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalTable.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.storage.StorageConfiguration;
-
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.catalog.SupportsWrite;
-import org.apache.spark.sql.connector.catalog.TableCapability;
-import org.apache.spark.sql.connector.write.LogicalWriteInfo;
-import org.apache.spark.sql.connector.write.WriteBuilder;
-import org.apache.spark.sql.types.StructType;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Hoodie's Implementation of {@link SupportsWrite}. This is used in data 
source "hudi.spark.internal" implementation for bulk insert.
- */
-class HoodieDataSourceInternalTable implements SupportsWrite {
-
-  private final String instantTime;
-  private final HoodieWriteConfig writeConfig;
-  private final StructType structType;
-  private final SparkSession jss;
-  private final StorageConfiguration<?> storageConf;
-  private final boolean arePartitionRecordsSorted;
-  private final Map<String, String> properties;
-  private final boolean populateMetaFields;
-
-  public HoodieDataSourceInternalTable(String instantTime, HoodieWriteConfig 
config,
-                                       StructType schema, SparkSession jss, 
StorageConfiguration<?> storageConf, Map<String, String> properties,
-                                       boolean populateMetaFields, boolean 
arePartitionRecordsSorted) {
-    this.instantTime = instantTime;
-    this.writeConfig = config;
-    this.structType = schema;
-    this.jss = jss;
-    this.storageConf = storageConf;
-    this.properties = properties;
-    this.populateMetaFields = populateMetaFields;
-    this.arePartitionRecordsSorted = arePartitionRecordsSorted;
-  }
-
-  @Override
-  public String name() {
-    return this.getClass().toString();
-  }
-
-  @Override
-  public StructType schema() {
-    return structType;
-  }
-
-  @Override
-  public Set<TableCapability> capabilities() {
-    return new HashSet<TableCapability>() {
-      {
-        add(TableCapability.BATCH_WRITE);
-        add(TableCapability.TRUNCATE);
-      }
-    };
-  }
-
-  @Override
-  public WriteBuilder newWriteBuilder(LogicalWriteInfo logicalWriteInfo) {
-    return new HoodieDataSourceInternalBatchWriteBuilder(instantTime, 
writeConfig, structType, jss,
-        storageConf, properties, populateMetaFields, 
arePartitionRecordsSorted);
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieWriterCommitMessage.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieWriterCommitMessage.java
deleted file mode 100644
index a955d68accba..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieWriterCommitMessage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.internal.BaseWriterCommitMessage;
-
-import org.apache.spark.sql.connector.write.WriterCommitMessage;
-
-import java.util.List;
-
-/**
- * Hoodie's {@link WriterCommitMessage} used in datasource 
"hudi.spark.internal" implementation.
- */
-public class HoodieWriterCommitMessage extends BaseWriterCommitMessage
-    implements WriterCommitMessage {
-
-  public HoodieWriterCommitMessage(List<WriteStatus> writeStatuses) {
-    super(writeStatuses);
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java
deleted file mode 100644
index 200e37f65773..000000000000
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieBulkInsertDataInternalWriter.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase;
-import org.apache.hudi.table.HoodieSparkTable;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Stream;
-
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
-import static 
org.apache.hudi.testutils.SparkDatasetTestUtils.getInternalRowWithError;
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
-import static org.junit.jupiter.api.Assertions.fail;
-
-/**
- * Unit tests {@link HoodieBulkInsertDataInternalWriter}.
- */
-class TestHoodieBulkInsertDataInternalWriter extends
-    HoodieBulkInsertInternalWriterTestBase {
-
-  private static Stream<Arguments> configParams() {
-    Object[][] data = new Object[][] {
-        {true, true},
-        {true, false},
-        {false, true},
-        {false, false}
-    };
-    return Stream.of(data).map(Arguments::of);
-  }
-
-  @ParameterizedTest
-  @MethodSource("configParams")
-  void testDataInternalWriter(boolean sorted, boolean populateMetaFields) 
throws Exception {
-    // init config and table
-    HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
-    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
-    // execute N rounds
-    for (int i = 0; i < 2; i++) {
-      String instantTime = "00" + i;
-      // init writer
-      HoodieBulkInsertDataInternalWriter writer = new 
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, 
RANDOM.nextInt(100000),
-          RANDOM.nextLong(), STRUCT_TYPE, populateMetaFields, sorted);
-
-      int size = 10 + RANDOM.nextInt(1000);
-      // write N rows to partition1, N rows to partition2 and N rows to 
partition3 ... Each batch should create a new RowCreateHandle and a new file
-      int batches = 3;
-      Dataset<Row> totalInputRows = null;
-
-      for (int j = 0; j < batches; j++) {
-        String partitionPath = 
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
-        Dataset<Row> inputRows = getRandomRows(sqlContext, size, 
partitionPath, false);
-        writeRows(inputRows, writer);
-        if (totalInputRows == null) {
-          totalInputRows = inputRows;
-        } else {
-          totalInputRows = totalInputRows.union(inputRows);
-        }
-      }
-
-      HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) 
writer.commit();
-      Option<List<String>> fileAbsPaths = Option.of(new ArrayList<>());
-      Option<List<String>> fileNames = Option.of(new ArrayList<>());
-
-      // verify write statuses
-      assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, 
sorted, fileAbsPaths, fileNames, false);
-
-      // verify rows
-      Dataset<Row> result = 
sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
-      assertOutput(totalInputRows, result, instantTime, fileNames, 
populateMetaFields);
-    }
-  }
-
-
-  /**
-   * Issue some corrupted or wrong schematized InternalRow after few valid 
InternalRows so that global error is thrown. write batch 1 of valid records 
write batch2 of invalid records which is expected
-   * to throw Global Error. Verify global error is set appropriately and only 
first batch of records are written to disk.
-   */
-  @Test
-  void testGlobalFailure() throws Exception {
-    // init config and table
-    HoodieWriteConfig cfg = getWriteConfig(true);
-    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
-    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
-
-    String instantTime = "001";
-    HoodieBulkInsertDataInternalWriter writer = new 
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, 
RANDOM.nextInt(100000),
-        RANDOM.nextLong(), STRUCT_TYPE, true, false);
-
-    int size = 10 + RANDOM.nextInt(100);
-    int totalFailures = 5;
-    // Generate first batch of valid rows
-    Dataset<Row> inputRows = getRandomRows(sqlContext, size / 2, 
partitionPath, false);
-    List<InternalRow> internalRows = toInternalRows(inputRows, ENCODER);
-
-    // generate some failures rows
-    for (int i = 0; i < totalFailures; i++) {
-      internalRows.add(getInternalRowWithError(partitionPath));
-    }
-
-    // generate 2nd batch of valid rows
-    Dataset<Row> inputRows2 = getRandomRows(sqlContext, size / 2, 
partitionPath, false);
-    internalRows.addAll(toInternalRows(inputRows2, ENCODER));
-
-    // issue writes
-    try {
-      for (InternalRow internalRow : internalRows) {
-        writer.write(internalRow);
-      }
-      fail("Should have failed");
-    } catch (Throwable e) {
-      // expected
-    }
-
-    HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) 
writer.commit();
-
-    Option<List<String>> fileAbsPaths = Option.of(new ArrayList<>());
-    Option<List<String>> fileNames = Option.of(new ArrayList<>());
-    // verify write statuses
-    assertWriteStatuses(commitMetadata.getWriteStatuses(), 1, size / 2, 
fileAbsPaths, fileNames);
-
-    // verify rows
-    Dataset<Row> result = 
sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
-    assertOutput(inputRows, result, instantTime, fileNames, true);
-  }
-
-  private void writeRows(Dataset<Row> inputRows, 
HoodieBulkInsertDataInternalWriter writer)
-      throws Exception {
-    List<InternalRow> internalRows = toInternalRows(inputRows, ENCODER);
-    // issue writes
-    for (InternalRow internalRow : internalRows) {
-      writer.write(internalRow);
-    }
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java
deleted file mode 100644
index 3bc5619ddfbf..000000000000
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.spark.internal;
-
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase;
-import org.apache.hudi.testutils.HoodieClientTestUtils;
-
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.write.DataWriter;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Stream;
-
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
-import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Unit tests {@link HoodieDataSourceInternalBatchWrite}.
- */
-class TestHoodieDataSourceInternalBatchWrite extends
-    HoodieBulkInsertInternalWriterTestBase {
-
-  private static Stream<Arguments> bulkInsertTypeParams() {
-    Object[][] data = new Object[][] {
-        {true},
-        {false}
-    };
-    return Stream.of(data).map(Arguments::of);
-  }
-
-  @ParameterizedTest
-  @MethodSource("bulkInsertTypeParams")
-  public void testDataSourceWriter(boolean populateMetaFields) throws 
Exception {
-    testDataSourceWriterInternal(Collections.emptyMap(), 
Collections.emptyMap(), populateMetaFields);
-  }
-
-  private void testDataSourceWriterInternal(Map<String, String> extraMetadata, 
Map<String, String> expectedExtraMetadata, boolean populateMetaFields) throws 
Exception {
-    // init config and table
-    HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
-    // init writer
-    String instantTime;
-    try (SparkRDDWriteClient<?> writeClient = new SparkRDDWriteClient<>(new 
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), 
cfg)) {
-      instantTime = writeClient.startCommit();
-    }
-
-    HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
-        new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), storageConf, extraMetadata, populateMetaFields, 
false);
-    DataWriter<InternalRow> writer = 
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, 
RANDOM.nextLong());
-
-    String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
-    List<String> partitionPathsAbs = new ArrayList<>();
-    for (String partitionPath : partitionPaths) {
-      partitionPathsAbs.add(basePath + "/" + partitionPath + "/*");
-    }
-
-    int size = 10 + RANDOM.nextInt(1000);
-    int batches = 5;
-    Dataset<Row> totalInputRows = null;
-
-    for (int j = 0; j < batches; j++) {
-      String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j 
% 3];
-      Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, 
false);
-      writeRows(inputRows, writer);
-      if (totalInputRows == null) {
-        totalInputRows = inputRows;
-      } else {
-        totalInputRows = totalInputRows.union(inputRows);
-      }
-    }
-
-    HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) 
writer.commit();
-    List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
-    commitMessages.add(commitMetadata);
-    dataSourceInternalBatchWrite.commit(commitMessages.toArray(new 
HoodieWriterCommitMessage[0]));
-
-    metaClient.reloadActiveTimeline();
-    Dataset<Row> result = HoodieClientTestUtils.read(
-        jsc, basePath, sqlContext, metaClient.getStorage(), 
partitionPathsAbs.toArray(new String[0]));
-    // verify output
-    assertOutput(totalInputRows, result, instantTime, Option.empty(), 
populateMetaFields);
-    assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, 
size, Option.empty(), Option.empty());
-
-    // verify extra metadata
-    Option<HoodieCommitMetadata> commitMetadataOption =
-        HoodieClientTestUtils.getCommitMetadataForLatestInstant(metaClient);
-    assertTrue(commitMetadataOption.isPresent());
-    Map<String, String> actualExtraMetadata = new HashMap<>();
-    
commitMetadataOption.get().getExtraMetadata().entrySet().stream().filter(entry 
->
-            !entry.getKey().equals(HoodieCommitMetadata.SCHEMA_KEY))
-        .forEach(entry -> actualExtraMetadata.put(entry.getKey(), 
entry.getValue()));
-    assertEquals(actualExtraMetadata, expectedExtraMetadata);
-  }
-
-  @Test
-  void testDataSourceWriterExtraCommitMetadata() throws Exception {
-    String commitExtraMetaPrefix = "commit_extra_meta_";
-    Map<String, String> extraMeta = new HashMap<>();
-    extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key(), 
commitExtraMetaPrefix);
-    extraMeta.put(commitExtraMetaPrefix + "a", "valA");
-    extraMeta.put(commitExtraMetaPrefix + "b", "valB");
-    extraMeta.put("commit_extra_c", "valC"); // should not be part of commit 
extra metadata
-
-    Map<String, String> expectedMetadata = new HashMap<>();
-    expectedMetadata.putAll(extraMeta);
-    
expectedMetadata.remove(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key());
-    expectedMetadata.remove("commit_extra_c");
-
-    testDataSourceWriterInternal(extraMeta, expectedMetadata, true);
-  }
-
-  @Test
-  void testDataSourceWriterEmptyExtraCommitMetadata() throws Exception {
-    String commitExtraMetaPrefix = "commit_extra_meta_";
-    Map<String, String> extraMeta = new HashMap<>();
-    extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX().key(), 
commitExtraMetaPrefix);
-    extraMeta.put("keyA", "valA");
-    extraMeta.put("keyB", "valB");
-    extraMeta.put("commit_extra_c", "valC");
-    // none of the keys has commit metadata key prefix.
-    testDataSourceWriterInternal(extraMeta, Collections.emptyMap(), true);
-  }
-
-  @ParameterizedTest
-  @MethodSource("bulkInsertTypeParams")
-  public void testMultipleDataSourceWrites(boolean populateMetaFields) throws 
Exception {
-    // init config and table
-    HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
-    int partitionCounter = 0;
-
-    // execute N rounds
-    for (int i = 0; i < 2; i++) {
-      String instantTime = createInstant(cfg);
-      // init writer
-      HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
-          new HoodieDataSourceInternalBatchWrite(instantTime, cfg, 
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.emptyMap(), 
populateMetaFields, false);
-      List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
-      Dataset<Row> totalInputRows = null;
-      DataWriter<InternalRow> writer = 
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++,
 RANDOM.nextLong());
-
-      int size = 10 + RANDOM.nextInt(1000);
-      int batches = 3; // one batch per partition
-
-      for (int j = 0; j < batches; j++) {
-        String partitionPath = 
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
-        Dataset<Row> inputRows = getRandomRows(sqlContext, size, 
partitionPath, false);
-        writeRows(inputRows, writer);
-        if (totalInputRows == null) {
-          totalInputRows = inputRows;
-        } else {
-          totalInputRows = totalInputRows.union(inputRows);
-        }
-      }
-
-      HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) 
writer.commit();
-      commitMessages.add(commitMetadata);
-      dataSourceInternalBatchWrite.commit(commitMessages.toArray(new 
HoodieWriterCommitMessage[0]));
-      metaClient.reloadActiveTimeline();
-
-      Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, 
sqlContext, metaClient.getCommitTimeline(), instantTime, populateMetaFields, 
HoodieTestUtils.INSTANT_GENERATOR);
-
-      // verify output
-      assertOutput(totalInputRows, result, instantTime, Option.empty(), 
populateMetaFields);
-      assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, 
size, Option.empty(), Option.empty());
-    }
-  }
-
-  // Large writes are not required to be executed w/ regular CI jobs. Takes 
lot of running time.
-  @Disabled
-  @ParameterizedTest
-  @MethodSource("bulkInsertTypeParams")
-  public void testLargeWrites(boolean populateMetaFields) throws Exception {
-    // init config and table
-    HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
-    int partitionCounter = 0;
-
-    // execute N rounds
-    for (int i = 0; i < 3; i++) {
-      String instantTime = createInstant(cfg);
-      // init writer
-      HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
-          new HoodieDataSourceInternalBatchWrite(instantTime, cfg, 
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.emptyMap(), 
populateMetaFields, false);
-      List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
-      Dataset<Row> totalInputRows = null;
-      DataWriter<InternalRow> writer = 
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++,
 RANDOM.nextLong());
-
-      int size = 10000 + RANDOM.nextInt(10000);
-      int batches = 3; // one batch per partition
-
-      for (int j = 0; j < batches; j++) {
-        String partitionPath = 
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
-        Dataset<Row> inputRows = getRandomRows(sqlContext, size, 
partitionPath, false);
-        writeRows(inputRows, writer);
-        if (totalInputRows == null) {
-          totalInputRows = inputRows;
-        } else {
-          totalInputRows = totalInputRows.union(inputRows);
-        }
-      }
-
-      HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) 
writer.commit();
-      commitMessages.add(commitMetadata);
-      dataSourceInternalBatchWrite.commit(commitMessages.toArray(new 
HoodieWriterCommitMessage[0]));
-      metaClient.reloadActiveTimeline();
-
-      Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, 
sqlContext, metaClient.getCommitTimeline(), instantTime,
-          populateMetaFields, HoodieTestUtils.INSTANT_GENERATOR);
-
-      // verify output
-      assertOutput(totalInputRows, result, instantTime, Option.empty(), 
populateMetaFields);
-      assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, 
size, Option.empty(), Option.empty());
-    }
-  }
-
-  /**
-   * Tests that DataSourceWriter.abort() will abort the written records of 
interest write and commit batch1 write and abort batch2 Read of entire dataset 
should show only records from batch1.
-   * commit batch1
-   * abort batch2
-   * verify only records from batch1 is available to read
-   */
-  @ParameterizedTest
-  @MethodSource("bulkInsertTypeParams")
-  public void testAbort(boolean populateMetaFields) throws Exception {
-    // init config and table
-    HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
-    String instantTime0 = createInstant(cfg);
-    // init writer
-    HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
-        new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), storageConf, Collections.emptyMap(), 
populateMetaFields, false);
-    DataWriter<InternalRow> writer = 
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, 
RANDOM.nextLong());
-
-    List<String> partitionPaths = 
Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
-    List<String> partitionPathsAbs = new ArrayList<>();
-    for (String partitionPath : partitionPaths) {
-      partitionPathsAbs.add(basePath + "/" + partitionPath + "/*");
-    }
-
-    int size = 10 + RANDOM.nextInt(100);
-    int batches = 1;
-    Dataset<Row> totalInputRows = null;
-
-    for (int j = 0; j < batches; j++) {
-      String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j 
% 3];
-      Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, 
false);
-      writeRows(inputRows, writer);
-      if (totalInputRows == null) {
-        totalInputRows = inputRows;
-      } else {
-        totalInputRows = totalInputRows.union(inputRows);
-      }
-    }
-
-    HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) 
writer.commit();
-    List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
-    commitMessages.add(commitMetadata);
-    // commit 1st batch
-    dataSourceInternalBatchWrite.commit(commitMessages.toArray(new 
HoodieWriterCommitMessage[0]));
-    metaClient.reloadActiveTimeline();
-    Dataset<Row> result = HoodieClientTestUtils.read(
-        jsc, basePath, sqlContext, metaClient.getStorage(), 
partitionPathsAbs.toArray(new String[0]));
-    // verify rows
-    assertOutput(totalInputRows, result, instantTime0, Option.empty(), 
populateMetaFields);
-    assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, 
size, Option.empty(), Option.empty());
-
-    // 2nd batch. abort in the end
-    String instantTime1 = createInstant(cfg);
-    dataSourceInternalBatchWrite =
-        new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), storageConf,
-            Collections.emptyMap(), populateMetaFields, false);
-    writer = 
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(1, 
RANDOM.nextLong());
-
-    for (int j = 0; j < batches; j++) {
-      String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j 
% 3];
-      Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, 
false);
-      writeRows(inputRows, writer);
-    }
-
-    commitMetadata = (HoodieWriterCommitMessage) writer.commit();
-    commitMessages = new ArrayList<>();
-    commitMessages.add(commitMetadata);
-    // commit 1st batch
-    dataSourceInternalBatchWrite.abort(commitMessages.toArray(new 
HoodieWriterCommitMessage[0]));
-    metaClient.reloadActiveTimeline();
-    result = HoodieClientTestUtils.read(
-        jsc, basePath, sqlContext, metaClient.getStorage(), 
partitionPathsAbs.toArray(new String[0]));
-    // verify rows
-    // only rows from first batch should be present
-    assertOutput(totalInputRows, result, instantTime0, Option.empty(), 
populateMetaFields);
-  }
-
-  private void writeRows(Dataset<Row> inputRows, DataWriter<InternalRow> 
writer) throws Exception {
-    List<InternalRow> internalRows = toInternalRows(inputRows, ENCODER);
-    // issue writes
-    for (InternalRow internalRow : internalRows) {
-      writer.write(internalRow);
-    }
-  }
-
-  private String createInstant(HoodieWriteConfig cfg) {
-    String instantTime;
-    try (SparkRDDWriteClient<?> writeClient = new SparkRDDWriteClient<>(new 
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), 
cfg)) {
-      instantTime = writeClient.startCommit();
-    }
-    return instantTime;
-  }
-}

Reply via email to