ashwinb1998 commented on a change in pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r797455273



##########
File path: 
phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixBatchWrite.java
##########
@@ -17,45 +17,35 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
 import org.apache.spark.sql.types.StructType;
 
+import java.util.Map;
+
 import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
-import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
 import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
 import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 
-public class PhoenixDataSourceWriter implements DataSourceWriter {
+public class PhoenixBatchWrite implements BatchWrite {
 
     private final PhoenixDataSourceWriteOptions options;
+    private final LogicalWriteInfo writeInfo;
 
-    public PhoenixDataSourceWriter(SaveMode mode, StructType schema, 
DataSourceOptions options) {
-        if (!mode.equals(SaveMode.Overwrite)) {
-            throw new RuntimeException("SaveMode other than SaveMode.OverWrite 
is not supported");
-        }
-        if (!options.tableName().isPresent()) {
-            throw new RuntimeException("No Phoenix option " + 
DataSourceOptions.TABLE_KEY + " defined");
-        }
-        if (!options.get(ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + ZOOKEEPER_URL + 
" defined");
-        }
-        this.options = createPhoenixDataSourceWriteOptions(options, schema);
-    }
-
-    @Override
-    public DataWriterFactory<InternalRow> createWriterFactory() {
-        return new PhoenixDataWriterFactory(options);
+    PhoenixBatchWrite(LogicalWriteInfo writeInfo, Map<String,String> options) {
+        this.writeInfo = writeInfo;
+        this.options = createPhoenixDataSourceWriteOptions(options, 
writeInfo.schema());
     }
 
     @Override
-    public boolean useCommitCoordinator() {

Review comment:
       It basically returns whether Spark should use the commit coordinator to 
ensure that at most one task for each partition commits. It has now been 
removed with DSV2 in spark3. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to