zhztheplayer commented on code in PR #9397: URL: https://github.com/apache/incubator-gluten/pull/9397#discussion_r2128389033
########## gluten-substrait/src/main/java/org/apache/gluten/connector/write/ColumnarBatchDataWriterFactory.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.connector.write; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +import java.io.Serializable; + +/** + * A factory of {@link DataWriter} returned by {@link + * BatchWrite#createBatchWriterFactory(PhysicalWriteInfo)}, which is responsible for creating and + * initializing the actual data writer at executor side. + * + * <p>Note that, the writer factory will be serialized and sent to executors, then the data writer + * will be created on executors and do the actual writing. So this interface must be serializable + * and {@link DataWriter} doesn't need to be. + * + * @since 3.0.0 + */ Review Comment: Let's rephrase the comments. Maybe we just convey that it's a companion interface with Spark's row-based version. ########## gluten-iceberg/src/main/java/org/apache/gluten/connector/write/ColumnarBatchWrite.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.connector.write; + +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; + +public abstract class ColumnarBatchWrite implements BatchWrite { + @Override + public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + throw new UnsupportedOperationException(); + } + + public ColumnarDataWriterFactory createColumnarBatchWriterFactory(PhysicalWriteInfo info) { + throw new UnsupportedOperationException(); + } +} Review Comment: Could move the API to `gluten-substrait`? ########## gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarAppendDataExec.scala: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.connector.write.ColumnarBatchDataWriterFactory +import org.apache.gluten.extension.columnar.transition.Convention +import org.apache.gluten.extension.columnar.transition.Convention.RowType + +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.write.{BatchWrite, Write, WriterCommitMessage} +import org.apache.spark.sql.datasources.v2.{DataWritingColumnarBatchSparkTask, DataWritingColumnarBatchSparkTaskResult, StreamWriterCommitProgressUtil, WritingColumnarBatchSparkTask} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.LongAccumulator + +abstract class ColumnarAppendDataExec(query: SparkPlan, refreshCache: () => Unit, write: Write) + extends V2ExistingTableWriteExec + with ValidatablePlan { + + def writingTaskBatch: WritingColumnarBatchSparkTask[_] = DataWritingColumnarBatchSparkTask + + override def doExecute(): RDD[InternalRow] = { + result + sparkContext.parallelize(Nil, 1) + } + + def createFactory(schema: StructType): ColumnarBatchDataWriterFactory + + protected def writeColumnarBatchWithV2(batchWrite: BatchWrite): Unit = { Review Comment: If the code could be reused, can we follow Spark's pattern to move it into a trait like `ColumnarV2ExistingTableWriteExec` or something? ########## cpp/velox/compute/iceberg/IcebergWriter.h: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "compute/iceberg/IcebergFormat.h" +#include "memory/VeloxColumnarBatch.h" +#include "velox/connectors/hive/iceberg/IcebergDataSink.h" + +namespace gluten { + +class IcebergWriter { Review Comment: We had an V1 write API in C++ https://github.com/apache/incubator-gluten/blob/main/cpp/velox/operators/writer/VeloxDataSource.h How can we possibly unify these writers together? ########## cpp/velox/jni/VeloxJniWrapper.cc: ########## @@ -463,6 +463,59 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJ JNI_METHOD_END(kInvalidObjectHandle) } +#ifdef GLUTEN_ENABLE_ICEBERG_WRITE Review Comment: Can we wait a bit until we have the functionality public accessible in either upstream Velox or IBM Velox? I hope that will not block the schedule here but not sure. ########## gluten-iceberg/src/main/java/org/apache/gluten/connector/write/ColumnarDataWriterFactory.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.connector.write; + +import org.apache.spark.TaskContext; +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriter; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +import java.io.Serializable; + +/** + * A factory of {@link DataWriter} returned by {@link + * BatchWrite#createBatchWriterFactory(PhysicalWriteInfo)}, which is responsible for creating and + * initializing the actual data writer at executor side. + * + * <p>Note that, the writer factory will be serialized and sent to executors, then the data writer + * will be created on executors and do the actual writing. So this interface must be serializable + * and {@link DataWriter} doesn't need to be. + * + * @since 3.0.0 + */ +@Evolving +public interface ColumnarDataWriterFactory extends Serializable { Review Comment: Ditto, could move to `gluten-substrait` and revive the docs. ########## gluten-substrait/src/main/scala/org/apache/spark/sql/datasources/v2/AppendColumnarBatchDataExec.scala: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.datasources.v2 + +import org.apache.gluten.connector.write.ColumnarBatchDataWriterFactory + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.connector.write._ +import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress +import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric} +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils + +case class DataWritingColumnarBatchSparkTaskResult( + numRows: Long, Review Comment: Why the file is named "AppendColumnarBatchDataExec.scala"? ########## gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala: ########## @@ -158,4 +158,6 @@ trait BackendSettingsApi { def supportIcebergEqualityDeleteRead(): Boolean = true + def supportAppendDataExec(): Boolean = false Review Comment: Do we have to add the API? Given the plan is offloaded out of the regular `tag-pull-offload` cycle. Or else we could just decide whether to offload the node in `VeloxIcebergAppendDataExec` based on certain conditions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
