ashwinb1998 commented on a change in pull request #69: URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r797455273
########## File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixBatchWrite.java ########## @@ -17,45 +17,35 @@ */ package org.apache.phoenix.spark.datasource.v2.writer; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; -import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; -import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import com.google.common.annotations.VisibleForTesting; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.WriterCommitMessage; import org.apache.spark.sql.types.StructType; +import java.util.Map; + import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE; -import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID; import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; +import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER; import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions; -public class PhoenixDataSourceWriter implements DataSourceWriter { +public class PhoenixBatchWrite implements BatchWrite { private final PhoenixDataSourceWriteOptions options; + private final LogicalWriteInfo writeInfo; - public PhoenixDataSourceWriter(SaveMode mode, StructType schema, DataSourceOptions options) { - if (!mode.equals(SaveMode.Overwrite)) { - throw new RuntimeException("SaveMode other than SaveMode.OverWrite is not supported"); - } - if (!options.tableName().isPresent()) { - throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined"); - } - if (!options.get(ZOOKEEPER_URL).isPresent()) { - throw new RuntimeException("No Phoenix option " + ZOOKEEPER_URL + " defined"); - } - this.options = createPhoenixDataSourceWriteOptions(options, schema); - } - - @Override - public DataWriterFactory<InternalRow> createWriterFactory() { - return new PhoenixDataWriterFactory(options); + PhoenixBatchWrite(LogicalWriteInfo writeInfo, Map<String,String> options) { + this.writeInfo = writeInfo; + this.options = createPhoenixDataSourceWriteOptions(options, writeInfo.schema()); } @Override - public boolean useCommitCoordinator() { Review comment: It basically returns whether Spark should use the commit coordinator to ensure that at most one task for each partition commits. It has now been removed with DSV2 in spark3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org