Copilot commented on code in PR #665:
URL: https://github.com/apache/wayang/pull/665#discussion_r2872006799


##########
wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkTableSinkTest.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.spark.operators;
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.Job;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.OutputSlot;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.spark.channels.RddChannel;
+import org.apache.wayang.spark.platform.SparkPlatform;

Review Comment:
   This test has unused imports (`OptimizationContext`, `OutputSlot`, 
`SparkPlatform`), which will fail the repository Checkstyle run 
(google_checks.xml includes `UnusedImports`). Please remove the unused imports 
(and any now-unneeded mocking setup) to keep the build green.
   ```suggestion
   import org.apache.wayang.core.platform.ChannelInstance;
   import org.apache.wayang.core.types.DataSetType;
   import org.apache.wayang.spark.channels.RddChannel;
   ```



##########
wayang-commons/wayang-basic/pom.xml:
##########
@@ -120,6 +120,11 @@
             <version>20231013</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-core</artifactId>
+            <version>${calcite.version}</version>
+        </dependency>
         <dependency>

Review Comment:
   Adding `calcite-core` to `wayang-basic` just to access 
`SqlDialect.DatabaseProduct` significantly increases the dependency footprint 
of a low-level module and can introduce version-conflict pressure across the 
build. Consider replacing this with a small internal enum (or moving 
dialect-related utilities into a module that already depends on Calcite) to 
keep `wayang-basic` lightweight.
   ```suggestion
   
   ```



##########
wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkTableSink.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.spark.operators;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.operators.TableSink;
+import org.apache.wayang.basic.util.SqlTypeUtils;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.spark.channels.RddChannel;
+import org.apache.wayang.spark.execution.SparkExecutor;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+public class SparkTableSink<T> extends TableSink<T> implements 
SparkExecutionOperator {
+
+    private SaveMode mode;
+
+    public SparkTableSink(Properties props, String mode, String tableName, 
String... columnNames) {
+        super(props, mode, tableName, columnNames);
+        this.setMode(mode);
+    }
+
+    public SparkTableSink(Properties props, String mode, String tableName, 
String[] columnNames, DataSetType<T> type) {
+        super(props, mode, tableName, columnNames, type);
+        this.setMode(mode);
+    }
+
+    public SparkTableSink(TableSink<T> that) {
+        super(that);
+        this.setMode(that.getMode());
+    }
+
+    @Override
+    public Tuple<Collection<ExecutionLineageNode>, 
Collection<ChannelInstance>> evaluate(
+            ChannelInstance[] inputs,
+            ChannelInstance[] outputs,
+            SparkExecutor sparkExecutor,
+            OptimizationContext.OperatorContext operatorContext) {
+        assert inputs.length == 1;
+        assert outputs.length == 0;
+
+        JavaRDD<T> recordRDD = ((RddChannel.Instance) inputs[0]).provideRdd();
+        Class<T> typeClass = (Class<T>) 
this.getType().getDataUnitType().getTypeClass();
+        SparkSession sparkSession = 
SparkSession.builder().sparkContext(sparkExecutor.sc.sc()).getOrCreate();
+        SQLContext sqlContext = sparkSession.sqlContext();
+
+        Dataset<Row> df;
+        if (typeClass == Record.class) {
+            // Records need manual schema handling
+            if (recordRDD.isEmpty()) {
+                return ExecutionOperator.modelEagerExecution(inputs, outputs, 
operatorContext);
+            }
+            Record first = (Record) recordRDD.first();

Review Comment:
   For `Record` inputs, this does two Spark actions (`isEmpty()` and then 
`first()`), which triggers two jobs and can be expensive. Prefer a single 
`take(1)` (or similar) to both detect emptiness and obtain a sample record for 
schema inference.
   ```suggestion
               List<T> sample = recordRDD.take(1);
               if (sample.isEmpty()) {
                   return ExecutionOperator.modelEagerExecution(inputs, 
outputs, operatorContext);
               }
               Record first = (Record) sample.get(0);
   ```



##########
wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/util/SqlTypeUtils.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.basic.util;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.wayang.basic.data.Record;
+
+import java.lang.reflect.Field;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility for mapping Java types to SQL types across different dialects.
+ */
+public class SqlTypeUtils {
+
+    private static final Map<SqlDialect.DatabaseProduct, Map<Class<?>, 
String>> dialectTypeMaps = new HashMap<>();
+
+    static {
+        // Default mappings (Standard SQL)
+        Map<Class<?>, String> defaultMap = new HashMap<>();
+        defaultMap.put(Integer.class, "INT");
+        defaultMap.put(int.class, "INT");
+        defaultMap.put(Long.class, "BIGINT");
+        defaultMap.put(long.class, "BIGINT");
+        defaultMap.put(Double.class, "DOUBLE");
+        defaultMap.put(double.class, "DOUBLE");
+        defaultMap.put(Float.class, "FLOAT");
+        defaultMap.put(float.class, "FLOAT");
+        defaultMap.put(Boolean.class, "BOOLEAN");
+        defaultMap.put(boolean.class, "BOOLEAN");
+        defaultMap.put(String.class, "VARCHAR(255)");
+        defaultMap.put(Date.class, "DATE");
+        defaultMap.put(LocalDate.class, "DATE");
+        defaultMap.put(Timestamp.class, "TIMESTAMP");
+        defaultMap.put(LocalDateTime.class, "TIMESTAMP");
+
+        dialectTypeMaps.put(SqlDialect.DatabaseProduct.UNKNOWN, defaultMap);
+
+        // PostgreSQL Overrides
+        Map<Class<?>, String> pgMap = new HashMap<>(defaultMap);
+        pgMap.put(Double.class, "DOUBLE PRECISION");
+        pgMap.put(double.class, "DOUBLE PRECISION");
+        dialectTypeMaps.put(SqlDialect.DatabaseProduct.POSTGRESQL, pgMap);
+
+        // Add more dialects here as needed (MySQL, Oracle, etc.)
+    }
+
+    /**
+     * Detects the database product from a JDBC URL.
+     *
+     * @param url JDBC URL
+     * @return detected DatabaseProduct
+     */
+    public static SqlDialect.DatabaseProduct detectProduct(String url) {
+        if (url == null)
+            return SqlDialect.DatabaseProduct.UNKNOWN;
+        String lowerUrl = url.toLowerCase();
+        if (lowerUrl.contains("postgresql") || lowerUrl.contains("postgres"))
+            return SqlDialect.DatabaseProduct.POSTGRESQL;
+        if (lowerUrl.contains("mysql"))
+            return SqlDialect.DatabaseProduct.MYSQL;
+        if (lowerUrl.contains("oracle"))
+            return SqlDialect.DatabaseProduct.ORACLE;
+        if (lowerUrl.contains("sqlite")) {
+            try {
+                return SqlDialect.DatabaseProduct.valueOf("SQLITE");
+            } catch (Exception e) {
+                return SqlDialect.DatabaseProduct.UNKNOWN;
+            }
+        }
+        if (lowerUrl.contains("h2"))
+            return SqlDialect.DatabaseProduct.H2;
+        if (lowerUrl.contains("derby"))
+            return SqlDialect.DatabaseProduct.DERBY;
+        if (lowerUrl.contains("mssql") || lowerUrl.contains("sqlserver"))
+            return SqlDialect.DatabaseProduct.MSSQL;
+        return SqlDialect.DatabaseProduct.UNKNOWN;
+    }
+
+    /**
+     * Returns the SQL type for a given Java class and database product.
+     *
+     * @param cls     Java class
+     * @param product database product
+     * @return SQL type string
+     */
+    public static String getSqlType(Class<?> cls, SqlDialect.DatabaseProduct 
product) {
+        Map<Class<?>, String> typeMap = dialectTypeMaps.getOrDefault(product,
+                dialectTypeMaps.get(SqlDialect.DatabaseProduct.UNKNOWN));
+        return typeMap.getOrDefault(cls, "VARCHAR(255)");
+    }
+
+    /**
+     * Extracts schema information from a POJO class or a Record.
+     *
+     * @param cls     POJO class
+     * @param product database product
+     * @return a list of schema fields
+     */
+    public static List<SchemaField> getSchema(Class<?> cls, 
SqlDialect.DatabaseProduct product) {
+        List<SchemaField> schema = new ArrayList<>();
+        if (cls == Record.class) {
+            // For Record.class without an instance, we can't derive 
names/types easily
+            // Users should use the instance-based getSchema or provide 
columnNames
+            return schema;
+        }
+
+        for (Field field : cls.getDeclaredFields()) {
+            if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
+                continue;
+            }
+            schema.add(new SchemaField(field.getName(), field.getType(), 
getSqlType(field.getType(), product)));
+        }
+        return schema;

Review Comment:
   POJO schema derivation uses `getDeclaredFields()`, but value extraction uses 
`ReflectionUtils.getProperty(...)`, which requires matching `getX()` getters. 
This will fail for private fields without getters (even though they are 
included in the schema) and can also include fields that should not be 
persisted. Consider deriving schema from JavaBeans properties/getters (or at 
least filtering to fields that have corresponding getters).



##########
wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTableSink.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.java.operators;
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.operators.TableSink;
+import org.apache.wayang.basic.util.SqlTypeUtils;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.ReflectionUtils;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.java.channels.CollectionChannel;
+import org.apache.wayang.java.channels.JavaChannelInstance;
+import org.apache.wayang.java.channels.StreamChannel;
+import org.apache.wayang.java.execution.JavaExecutor;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+public class JavaTableSink<T> extends TableSink<T> implements 
JavaExecutionOperator {
+
+    private void setRecordValue(PreparedStatement ps, int index, Object value) 
throws SQLException {
+        if (value == null) {
+            ps.setNull(index, java.sql.Types.NULL);
+        } else if (value instanceof Integer) {
+            ps.setInt(index, (Integer) value);
+        } else if (value instanceof Long) {
+            ps.setLong(index, (Long) value);
+        } else if (value instanceof Double) {
+            ps.setDouble(index, (Double) value);
+        } else if (value instanceof Float) {
+            ps.setFloat(index, (Float) value);
+        } else if (value instanceof Boolean) {
+            ps.setBoolean(index, (Boolean) value);
+        } else if (value instanceof java.sql.Date) {
+            ps.setDate(index, (java.sql.Date) value);
+        } else if (value instanceof java.sql.Timestamp) {
+            ps.setTimestamp(index, (java.sql.Timestamp) value);
+        } else {
+            ps.setString(index, value.toString());
+        }
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName) {
+        this(props, mode, tableName, null);
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName, 
String... columnNames) {
+        super(props, mode, tableName, columnNames);
+
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName, 
String[] columnNames, DataSetType<T> type) {
+        super(props, mode, tableName, columnNames, type);
+
+    }
+
+    public JavaTableSink(TableSink<T> that) {
+        super(that);
+    }
+
+    @Override
+    public Tuple<Collection<ExecutionLineageNode>, 
Collection<ChannelInstance>> evaluate(
+            ChannelInstance[] inputs,
+            ChannelInstance[] outputs,
+            JavaExecutor javaExecutor,
+            OptimizationContext.OperatorContext operatorContext) {
+        assert inputs.length == 1;
+        assert outputs.length == 0;
+        JavaChannelInstance input = (JavaChannelInstance) inputs[0];
+
+        // The stream is converted to an Iterator so that we can read the 
first element
+        // w/o consuming the entire stream.
+        Iterator<T> recordIterator = input.<T>provideStream().iterator();
+
+        if (!recordIterator.hasNext()) {
+            return ExecutionOperator.modelEagerExecution(inputs, outputs, 
operatorContext);
+        }
+
+        // We read the first element to derive the Record schema.
+        T firstElement = recordIterator.next();
+        Class<?> typeClass = this.getType().getDataUnitType().getTypeClass();
+
+        String url = this.getProperties().getProperty("url");
+        org.apache.calcite.sql.SqlDialect.DatabaseProduct product = 
SqlTypeUtils.detectProduct(url);
+
+        List<SqlTypeUtils.SchemaField> schemaFields;
+        if (typeClass != Record.class) {
+            schemaFields = SqlTypeUtils.getSchema(typeClass, product);
+        } else {
+            schemaFields = SqlTypeUtils.getSchema((Record) firstElement, 
product, this.getColumnNames());
+        }
+
+        String[] currentColumnNames = this.getColumnNames();
+        if (currentColumnNames == null || currentColumnNames.length == 0) {
+            currentColumnNames = new String[schemaFields.size()];
+            for (int i = 0; i < schemaFields.size(); i++) {
+                currentColumnNames[i] = schemaFields.get(i).getName();
+            }
+            this.setColumnNames(currentColumnNames);
+        }
+
+        String[] sqlTypes = new String[currentColumnNames.length];
+        for (int i = 0; i < currentColumnNames.length; i++) {
+            sqlTypes[i] = "VARCHAR(255)"; // Default
+            for (SqlTypeUtils.SchemaField field : schemaFields) {
+                if (field.getName().equals(currentColumnNames[i])) {
+                    sqlTypes[i] = field.getSqlType();
+                    break;
+                }
+            }
+        }
+
+        final String[] finalColumnNames = currentColumnNames;
+        final String[] finalSqlTypes = sqlTypes;
+
+        this.getProperties().setProperty("streamingBatchInsert", "True");
+
+        Connection conn;
+        try {
+            Class.forName(this.getProperties().getProperty("driver"));
+            conn = 
DriverManager.getConnection(this.getProperties().getProperty("url"), 
this.getProperties());
+            conn.setAutoCommit(false);
+
+            Statement stmt = conn.createStatement();
+
+            // Drop existing table if the mode is 'overwrite'.
+            if (this.getMode().equals("overwrite")) {
+                stmt.execute("DROP TABLE IF EXISTS " + this.getTableName());
+            }
+
+            // Create a new table if the specified table name does not exist 
yet.
+            StringBuilder sb = new StringBuilder();
+            sb.append("CREATE TABLE IF NOT EXISTS 
").append(this.getTableName()).append(" (");
+            String separator = "";
+            for (int i = 0; i < finalColumnNames.length; i++) {
+                
sb.append(separator).append("\"").append(finalColumnNames[i]).append("\" 
").append(finalSqlTypes[i]);
+                separator = ", ";
+            }
+            sb.append(")");
+            stmt.execute(sb.toString());
+
+            // Create a prepared statement to insert value from the 
recordIterator.
+            sb = new StringBuilder();
+            sb.append("INSERT INTO ").append(this.getTableName()).append(" (");
+            separator = "";
+            for (int i = 0; i < finalColumnNames.length; i++) {
+                
sb.append(separator).append("\"").append(finalColumnNames[i]).append("\"");
+                separator = ", ";
+            }
+            sb.append(") VALUES (");
+            separator = "";
+            for (int i = 0; i < finalColumnNames.length; i++) {
+                sb.append(separator).append("?");
+                separator = ", ";
+            }
+            sb.append(")");
+            PreparedStatement ps = conn.prepareStatement(sb.toString());
+
+            // The schema Record has to be pushed to the database too.
+            this.pushToStatement(ps, firstElement, typeClass, 
finalColumnNames);
+            ps.addBatch();
+
+            // Iterate through all remaining records and add them to the 
prepared statement
+            recordIterator.forEachRemaining(
+                    r -> {
+                        try {
+                            this.pushToStatement(ps, r, typeClass, 
finalColumnNames);
+                            ps.addBatch();
+                        } catch (SQLException e) {
+                            e.printStackTrace();
+                        }
+                    });
+
+            ps.executeBatch();
+            conn.commit();
+            conn.close();
+        } catch (ClassNotFoundException e) {
+            System.out.println("Please specify a correct database driver.");
+            e.printStackTrace();
+        } catch (SQLException e) {
+            e.printStackTrace();

Review Comment:
   JDBC resources (`Connection`, `Statement`, `PreparedStatement`) are not 
managed with try-with-resources, so an exception before `conn.close()` will 
leak connections/statements and may leave an open transaction. Use 
try-with-resources (or a `finally`) for `Connection`, `Statement`, and 
`PreparedStatement`, and rollback on failure when `autoCommit=false`.
   ```suggestion
               try (PreparedStatement ps = 
conn.prepareStatement(sb.toString())) {
   
                   // The schema Record has to be pushed to the database too.
                   this.pushToStatement(ps, firstElement, typeClass, 
finalColumnNames);
                   ps.addBatch();
   
                   // Iterate through all remaining records and add them to the 
prepared statement
                   recordIterator.forEachRemaining(
                           r -> {
                               try {
                                   this.pushToStatement(ps, r, typeClass, 
finalColumnNames);
                                   ps.addBatch();
                               } catch (SQLException e) {
                                   e.printStackTrace();
                               }
                           });
   
                   ps.executeBatch();
                   conn.commit();
               }
           } catch (ClassNotFoundException e) {
               System.out.println("Please specify a correct database driver.");
               e.printStackTrace();
           } catch (SQLException e) {
               try {
                   if (conn != null && !conn.getAutoCommit()) {
                       conn.rollback();
                   }
               } catch (SQLException rollbackException) {
                   rollbackException.printStackTrace();
               }
               e.printStackTrace();
           } finally {
               if (conn != null) {
                   try {
                       conn.close();
                   } catch (SQLException closeException) {
                       closeException.printStackTrace();
                   }
               }
   ```



##########
wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkTableSink.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.spark.operators;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.operators.TableSink;
+import org.apache.wayang.basic.util.SqlTypeUtils;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.spark.channels.RddChannel;
+import org.apache.wayang.spark.execution.SparkExecutor;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+public class SparkTableSink<T> extends TableSink<T> implements 
SparkExecutionOperator {
+
+    private SaveMode mode;
+
+    public SparkTableSink(Properties props, String mode, String tableName, 
String... columnNames) {
+        super(props, mode, tableName, columnNames);
+        this.setMode(mode);
+    }
+
+    public SparkTableSink(Properties props, String mode, String tableName, 
String[] columnNames, DataSetType<T> type) {
+        super(props, mode, tableName, columnNames, type);
+        this.setMode(mode);
+    }
+
+    public SparkTableSink(TableSink<T> that) {
+        super(that);
+        this.setMode(that.getMode());
+    }
+
+    @Override
+    public Tuple<Collection<ExecutionLineageNode>, 
Collection<ChannelInstance>> evaluate(
+            ChannelInstance[] inputs,
+            ChannelInstance[] outputs,
+            SparkExecutor sparkExecutor,
+            OptimizationContext.OperatorContext operatorContext) {
+        assert inputs.length == 1;
+        assert outputs.length == 0;
+
+        JavaRDD<T> recordRDD = ((RddChannel.Instance) inputs[0]).provideRdd();
+        Class<T> typeClass = (Class<T>) 
this.getType().getDataUnitType().getTypeClass();
+        SparkSession sparkSession = 
SparkSession.builder().sparkContext(sparkExecutor.sc.sc()).getOrCreate();
+        SQLContext sqlContext = sparkSession.sqlContext();
+
+        Dataset<Row> df;
+        if (typeClass == Record.class) {
+            // Records need manual schema handling
+            if (recordRDD.isEmpty()) {
+                return ExecutionOperator.modelEagerExecution(inputs, outputs, 
operatorContext);
+            }
+            Record first = (Record) recordRDD.first();
+
+            // Centralized Schema Derivation
+            List<SqlTypeUtils.SchemaField> schemaFields = 
SqlTypeUtils.getSchema(first,
+                    
SqlTypeUtils.detectProduct(this.getProperties().getProperty("url")),
+                    this.getColumnNames());
+
+            // Map Record to Row
+            JavaRDD<Row> rowRDD = recordRDD.map(rec -> 
RowFactory.create(((Record) rec).getValues()));
+
+            // Build Spark Schema
+            StructField[] fields = new StructField[schemaFields.size()];
+            for (int i = 0; i < schemaFields.size(); i++) {
+                SqlTypeUtils.SchemaField sf = schemaFields.get(i);
+                org.apache.spark.sql.types.DataType sparkType = 
getSparkDataType(sf.getJavaClass());
+                fields[i] = new StructField(sf.getName(), sparkType, true, 
Metadata.empty());
+            }
+
+            // Update column names in the operator if they were generated
+            String[] newColNames = 
schemaFields.stream().map(SqlTypeUtils.SchemaField::getName).toArray(String[]::new);
+            this.setColumnNames(newColNames);
+
+            df = sqlContext.createDataFrame(rowRDD, new StructType(fields));
+        } else {
+            // POJO Case: Let Spark handle it natively
+            df = sqlContext.createDataFrame(recordRDD, typeClass);
+            // If columnNames are provided, we should probably select/rename 
them,
+            // but usually createDataFrame(rdd, beanClass) maps fields to 
columns.
+            if (this.getColumnNames() != null && this.getColumnNames().length 
> 0) {
+                // Optionally filter or reorder columns to match 
this.getColumnNames()
+                // For now, Spark's native mapping is preferred.
+            }
+        }
+
+        this.getProperties().setProperty("batchSize", "250000");
+        df.write()
+                .mode(this.mode)
+                .jdbc(this.getProperties().getProperty("url"), 
this.getTableName(), this.getProperties());

Review Comment:
   `this.getProperties().setProperty("batchSize", "250000")` mutates the 
user-provided JDBC `Properties` and unconditionally overrides any 
caller-supplied batch size. Prefer not mutating shared config, or only applying 
a default when the option is absent (and ideally via a copy of the `Properties` 
passed to `.jdbc(...)`).
   ```suggestion
           // Create a defensive copy of the JDBC properties to avoid mutating 
caller-provided config.
           Properties jdbcProps = new Properties();
           jdbcProps.putAll(this.getProperties());
           // Apply a default batchSize only if the caller did not specify one.
           if (!jdbcProps.containsKey("batchSize")) {
               jdbcProps.setProperty("batchSize", "250000");
           }
           df.write()
                   .mode(this.mode)
                   .jdbc(jdbcProps.getProperty("url"), this.getTableName(), 
jdbcProps);
   ```



##########
wayang-platforms/wayang-java/pom.xml:
##########
@@ -78,7 +78,19 @@
             <artifactId>log4j-slf4j-impl</artifactId>
             <version>2.20.0</version>
         </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.7.2</version>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   `org.postgresql:postgresql` is added as a test dependency in the Java 
platform module, but the new tests use H2 and there is no usage of the 
PostgreSQL driver in this module’s test sources. If it’s not needed for other 
tests, removing it will reduce build time/dependency surface; otherwise, 
consider adding a test that actually covers PostgreSQL-specific behavior.
   ```suggestion
   
   ```



##########
wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTableSink.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.java.operators;
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.operators.TableSink;
+import org.apache.wayang.basic.util.SqlTypeUtils;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.ReflectionUtils;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.java.channels.CollectionChannel;
+import org.apache.wayang.java.channels.JavaChannelInstance;
+import org.apache.wayang.java.channels.StreamChannel;
+import org.apache.wayang.java.execution.JavaExecutor;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+public class JavaTableSink<T> extends TableSink<T> implements 
JavaExecutionOperator {
+
+    private void setRecordValue(PreparedStatement ps, int index, Object value) 
throws SQLException {
+        if (value == null) {
+            ps.setNull(index, java.sql.Types.NULL);
+        } else if (value instanceof Integer) {
+            ps.setInt(index, (Integer) value);
+        } else if (value instanceof Long) {
+            ps.setLong(index, (Long) value);
+        } else if (value instanceof Double) {
+            ps.setDouble(index, (Double) value);
+        } else if (value instanceof Float) {
+            ps.setFloat(index, (Float) value);
+        } else if (value instanceof Boolean) {
+            ps.setBoolean(index, (Boolean) value);
+        } else if (value instanceof java.sql.Date) {
+            ps.setDate(index, (java.sql.Date) value);
+        } else if (value instanceof java.sql.Timestamp) {
+            ps.setTimestamp(index, (java.sql.Timestamp) value);
+        } else {
+            ps.setString(index, value.toString());
+        }
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName) {
+        this(props, mode, tableName, null);
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName, 
String... columnNames) {
+        super(props, mode, tableName, columnNames);
+
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName, 
String[] columnNames, DataSetType<T> type) {
+        super(props, mode, tableName, columnNames, type);
+
+    }
+
+    public JavaTableSink(TableSink<T> that) {
+        super(that);
+    }
+
+    @Override
+    public Tuple<Collection<ExecutionLineageNode>, 
Collection<ChannelInstance>> evaluate(
+            ChannelInstance[] inputs,
+            ChannelInstance[] outputs,
+            JavaExecutor javaExecutor,
+            OptimizationContext.OperatorContext operatorContext) {
+        assert inputs.length == 1;
+        assert outputs.length == 0;
+        JavaChannelInstance input = (JavaChannelInstance) inputs[0];
+
+        // The stream is converted to an Iterator so that we can read the 
first element
+        // w/o consuming the entire stream.
+        Iterator<T> recordIterator = input.<T>provideStream().iterator();
+
+        if (!recordIterator.hasNext()) {
+            return ExecutionOperator.modelEagerExecution(inputs, outputs, 
operatorContext);
+        }
+
+        // We read the first element to derive the Record schema.
+        T firstElement = recordIterator.next();
+        Class<?> typeClass = this.getType().getDataUnitType().getTypeClass();
+
+        String url = this.getProperties().getProperty("url");
+        org.apache.calcite.sql.SqlDialect.DatabaseProduct product = 
SqlTypeUtils.detectProduct(url);
+
+        List<SqlTypeUtils.SchemaField> schemaFields;
+        if (typeClass != Record.class) {
+            schemaFields = SqlTypeUtils.getSchema(typeClass, product);
+        } else {
+            schemaFields = SqlTypeUtils.getSchema((Record) firstElement, 
product, this.getColumnNames());
+        }
+
+        String[] currentColumnNames = this.getColumnNames();
+        if (currentColumnNames == null || currentColumnNames.length == 0) {
+            currentColumnNames = new String[schemaFields.size()];
+            for (int i = 0; i < schemaFields.size(); i++) {
+                currentColumnNames[i] = schemaFields.get(i).getName();
+            }
+            this.setColumnNames(currentColumnNames);
+        }
+
+        String[] sqlTypes = new String[currentColumnNames.length];
+        for (int i = 0; i < currentColumnNames.length; i++) {
+            sqlTypes[i] = "VARCHAR(255)"; // Default
+            for (SqlTypeUtils.SchemaField field : schemaFields) {
+                if (field.getName().equals(currentColumnNames[i])) {
+                    sqlTypes[i] = field.getSqlType();
+                    break;
+                }
+            }
+        }
+
+        final String[] finalColumnNames = currentColumnNames;
+        final String[] finalSqlTypes = sqlTypes;
+
+        this.getProperties().setProperty("streamingBatchInsert", "True");
+
+        Connection conn;
+        try {
+            Class.forName(this.getProperties().getProperty("driver"));
+            conn = 
DriverManager.getConnection(this.getProperties().getProperty("url"), 
this.getProperties());
+            conn.setAutoCommit(false);
+
+            Statement stmt = conn.createStatement();
+
+            // Drop existing table if the mode is 'overwrite'.
+            if (this.getMode().equals("overwrite")) {
+                stmt.execute("DROP TABLE IF EXISTS " + this.getTableName());
+            }
+
+            // Create a new table if the specified table name does not exist 
yet.
+            StringBuilder sb = new StringBuilder();
+            sb.append("CREATE TABLE IF NOT EXISTS 
").append(this.getTableName()).append(" (");
+            String separator = "";
+            for (int i = 0; i < finalColumnNames.length; i++) {
+                
sb.append(separator).append("\"").append(finalColumnNames[i]).append("\" 
").append(finalSqlTypes[i]);
+                separator = ", ";
+            }
+            sb.append(")");
+            stmt.execute(sb.toString());
+
+            // Create a prepared statement to insert value from the 
recordIterator.
+            sb = new StringBuilder();
+            sb.append("INSERT INTO ").append(this.getTableName()).append(" (");
+            separator = "";

Review Comment:
   `tableName` is concatenated directly into SQL statements 
(DROP/CREATE/INSERT). Since it is user-provided, this is vulnerable to SQL 
injection and also breaks on reserved words/special characters. Please validate 
identifiers (e.g., strict `[A-Za-z0-9_]+` allowlist) and/or use proper 
dialect-specific quoting for the table name as well.



##########
wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkTableSink.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.spark.operators;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.operators.TableSink;
+import org.apache.wayang.basic.util.SqlTypeUtils;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.spark.channels.RddChannel;
+import org.apache.wayang.spark.execution.SparkExecutor;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+public class SparkTableSink<T> extends TableSink<T> implements 
SparkExecutionOperator {
+
+    private SaveMode mode;
+
+    public SparkTableSink(Properties props, String mode, String tableName, 
String... columnNames) {
+        super(props, mode, tableName, columnNames);
+        this.setMode(mode);
+    }
+
+    public SparkTableSink(Properties props, String mode, String tableName, 
String[] columnNames, DataSetType<T> type) {
+        super(props, mode, tableName, columnNames, type);
+        this.setMode(mode);
+    }
+
+    public SparkTableSink(TableSink<T> that) {
+        super(that);
+        this.setMode(that.getMode());
+    }
+
+    @Override
+    public Tuple<Collection<ExecutionLineageNode>, 
Collection<ChannelInstance>> evaluate(
+            ChannelInstance[] inputs,
+            ChannelInstance[] outputs,
+            SparkExecutor sparkExecutor,
+            OptimizationContext.OperatorContext operatorContext) {
+        assert inputs.length == 1;
+        assert outputs.length == 0;
+
+        JavaRDD<T> recordRDD = ((RddChannel.Instance) inputs[0]).provideRdd();
+        Class<T> typeClass = (Class<T>) 
this.getType().getDataUnitType().getTypeClass();
+        SparkSession sparkSession = 
SparkSession.builder().sparkContext(sparkExecutor.sc.sc()).getOrCreate();
+        SQLContext sqlContext = sparkSession.sqlContext();
+
+        Dataset<Row> df;
+        if (typeClass == Record.class) {
+            // Records need manual schema handling
+            if (recordRDD.isEmpty()) {
+                return ExecutionOperator.modelEagerExecution(inputs, outputs, 
operatorContext);
+            }
+            Record first = (Record) recordRDD.first();
+
+            // Centralized Schema Derivation
+            List<SqlTypeUtils.SchemaField> schemaFields = 
SqlTypeUtils.getSchema(first,
+                    
SqlTypeUtils.detectProduct(this.getProperties().getProperty("url")),
+                    this.getColumnNames());
+
+            // Map Record to Row
+            JavaRDD<Row> rowRDD = recordRDD.map(rec -> 
RowFactory.create(((Record) rec).getValues()));
+
+            // Build Spark Schema
+            StructField[] fields = new StructField[schemaFields.size()];
+            for (int i = 0; i < schemaFields.size(); i++) {
+                SqlTypeUtils.SchemaField sf = schemaFields.get(i);
+                org.apache.spark.sql.types.DataType sparkType = 
getSparkDataType(sf.getJavaClass());
+                fields[i] = new StructField(sf.getName(), sparkType, true, 
Metadata.empty());
+            }
+
+            // Update column names in the operator if they were generated
+            String[] newColNames = 
schemaFields.stream().map(SqlTypeUtils.SchemaField::getName).toArray(String[]::new);
+            this.setColumnNames(newColNames);
+
+            df = sqlContext.createDataFrame(rowRDD, new StructType(fields));
+        } else {
+            // POJO Case: Let Spark handle it natively
+            df = sqlContext.createDataFrame(recordRDD, typeClass);
+            // If columnNames are provided, we should probably select/rename 
them,
+            // but usually createDataFrame(rdd, beanClass) maps fields to 
columns.
+            if (this.getColumnNames() != null && this.getColumnNames().length 
> 0) {
+                // Optionally filter or reorder columns to match 
this.getColumnNames()
+                // For now, Spark's native mapping is preferred.
+            }
+        }
+
+        this.getProperties().setProperty("batchSize", "250000");
+        df.write()
+                .mode(this.mode)
+                .jdbc(this.getProperties().getProperty("url"), 
this.getTableName(), this.getProperties());
+
+        return ExecutionOperator.modelEagerExecution(inputs, outputs, 
operatorContext);
+    }
+

Review Comment:
   `DataFrameWriter.jdbc(url, tableName, ...)` treats the `tableName`/`dbtable` 
option as a SQL fragment (Spark allows subqueries like `(select ...) t`), so 
passing an unchecked user-provided table name can enable SQL injection or 
unexpected writes. Consider validating/quoting `tableName` (and documenting 
allowed forms) before passing it to Spark JDBC.
   ```suggestion
                   .jdbc(this.getProperties().getProperty("url"), 
sanitizeTableName(this.getTableName()), this.getProperties());
   
           return ExecutionOperator.modelEagerExecution(inputs, outputs, 
operatorContext);
       }
   
       private static final java.util.regex.Pattern TABLE_NAME_PATTERN =
               java.util.regex.Pattern.compile("[A-Za-z0-9_\\.]+");
   
       /**
        * Validate the table name before passing it to Spark's JDBC writer.
        * Spark treats the `tableName` argument as a SQL fragment, so we 
restrict it
        * to a conservative set of characters to avoid SQL injection or 
unintended writes.
        */
       private static String sanitizeTableName(String tableName) {
           if (tableName == null || 
!TABLE_NAME_PATTERN.matcher(tableName).matches()) {
               throw new WayangException("Illegal table name for Spark JDBC 
sink: " + tableName);
           }
           return tableName;
       }
   ```



##########
wayang-platforms/wayang-spark/pom.xml:
##########
@@ -121,5 +121,18 @@
             <version>4.8</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.7.2</version>
+            <scope>compile</scope>

Review Comment:
   `org.postgresql:postgresql` is added with `compile` scope in the Spark 
platform module. This makes the Spark platform depend on a specific JDBC driver 
even though the sink is intended to be generic JDBC; it also increases artifact 
size and can cause driver/version conflicts for downstream users. Consider 
removing it or changing it to `test`/`runtime`/`optional`, letting applications 
provide their chosen driver.
   ```suggestion
               <scope>test</scope>
   ```



##########
wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/TableSink.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.basic.operators;
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.core.plan.wayangplan.UnarySink;
+import org.apache.wayang.core.types.DataSetType;
+
+import java.util.Properties;
+
+/**
+ * {@link UnarySink} that writes Records to a database table.
+ */
+
+public class TableSink<T> extends UnarySink<T> {
+    private final String tableName;
+
+    private String[] columnNames;
+
+    private final Properties props;
+
+    private String mode;
+
+    /**
+     * Creates a new instance.
+     *
+     * @param props       database connection properties
+     * @param mode        write mode
+     * @param tableName   name of the table to be written
+     * @param columnNames names of the columns in the tables
+     */
+    public TableSink(Properties props, String mode, String tableName, 
String... columnNames) {
+        this(props, mode, tableName, columnNames, (DataSetType<T>) 
DataSetType.createDefault(Record.class));
+    }
+
+    public TableSink(Properties props, String mode, String tableName, String[] 
columnNames, DataSetType<T> type) {
+        super(type);
+        this.tableName = tableName;
+        this.columnNames = columnNames;
+        this.props = props;
+        this.mode = mode;
+    }
+
+    /**
+     * Copies an instance (exclusive of broadcasts).
+     *
+     * @param that that should be copied
+     */
+    public TableSink(TableSink<T> that) {
+        super(that);
+        this.tableName = that.getTableName();
+        this.columnNames = that.getColumnNames();
+        this.props = that.getProperties();
+        this.mode = that.getMode();
+    }

Review Comment:
   The copy constructor keeps references to the original `Properties` and 
`columnNames` array (`this.props = that.getProperties()` / `this.columnNames = 
that.getColumnNames()`). Because these are mutable and also exposed via 
getters, later mutations (e.g., SparkTableSink setting JDBC options or 
regenerating column names) can unexpectedly affect other operator instances. 
Consider defensively copying (`new Properties(props)`, `Arrays.copyOf(...)`) in 
constructors/getters.



##########
wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/util/SqlTypeUtils.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.basic.util;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.wayang.basic.data.Record;
+
+import java.lang.reflect.Field;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility for mapping Java types to SQL types across different dialects.
+ */
+public class SqlTypeUtils {
+
+    private static final Map<SqlDialect.DatabaseProduct, Map<Class<?>, 
String>> dialectTypeMaps = new HashMap<>();
+
+    static {
+        // Default mappings (Standard SQL)
+        Map<Class<?>, String> defaultMap = new HashMap<>();
+        defaultMap.put(Integer.class, "INT");
+        defaultMap.put(int.class, "INT");
+        defaultMap.put(Long.class, "BIGINT");
+        defaultMap.put(long.class, "BIGINT");
+        defaultMap.put(Double.class, "DOUBLE");
+        defaultMap.put(double.class, "DOUBLE");
+        defaultMap.put(Float.class, "FLOAT");
+        defaultMap.put(float.class, "FLOAT");
+        defaultMap.put(Boolean.class, "BOOLEAN");
+        defaultMap.put(boolean.class, "BOOLEAN");
+        defaultMap.put(String.class, "VARCHAR(255)");
+        defaultMap.put(Date.class, "DATE");
+        defaultMap.put(LocalDate.class, "DATE");
+        defaultMap.put(Timestamp.class, "TIMESTAMP");
+        defaultMap.put(LocalDateTime.class, "TIMESTAMP");
+
+        dialectTypeMaps.put(SqlDialect.DatabaseProduct.UNKNOWN, defaultMap);
+
+        // PostgreSQL Overrides
+        Map<Class<?>, String> pgMap = new HashMap<>(defaultMap);
+        pgMap.put(Double.class, "DOUBLE PRECISION");
+        pgMap.put(double.class, "DOUBLE PRECISION");
+        dialectTypeMaps.put(SqlDialect.DatabaseProduct.POSTGRESQL, pgMap);
+
+        // Add more dialects here as needed (MySQL, Oracle, etc.)
+    }
+
+    /**
+     * Detects the database product from a JDBC URL.
+     *
+     * @param url JDBC URL
+     * @return detected DatabaseProduct
+     */
+    public static SqlDialect.DatabaseProduct detectProduct(String url) {
+        if (url == null)
+            return SqlDialect.DatabaseProduct.UNKNOWN;
+        String lowerUrl = url.toLowerCase();
+        if (lowerUrl.contains("postgresql") || lowerUrl.contains("postgres"))
+            return SqlDialect.DatabaseProduct.POSTGRESQL;
+        if (lowerUrl.contains("mysql"))
+            return SqlDialect.DatabaseProduct.MYSQL;
+        if (lowerUrl.contains("oracle"))
+            return SqlDialect.DatabaseProduct.ORACLE;
+        if (lowerUrl.contains("sqlite")) {
+            try {
+                return SqlDialect.DatabaseProduct.valueOf("SQLITE");
+            } catch (Exception e) {
+                return SqlDialect.DatabaseProduct.UNKNOWN;
+            }
+        }
+        if (lowerUrl.contains("h2"))
+            return SqlDialect.DatabaseProduct.H2;
+        if (lowerUrl.contains("derby"))
+            return SqlDialect.DatabaseProduct.DERBY;
+        if (lowerUrl.contains("mssql") || lowerUrl.contains("sqlserver"))
+            return SqlDialect.DatabaseProduct.MSSQL;
+        return SqlDialect.DatabaseProduct.UNKNOWN;
+    }
+
+    /**
+     * Returns the SQL type for a given Java class and database product.
+     *
+     * @param cls     Java class
+     * @param product database product
+     * @return SQL type string
+     */
+    public static String getSqlType(Class<?> cls, SqlDialect.DatabaseProduct 
product) {
+        Map<Class<?>, String> typeMap = dialectTypeMaps.getOrDefault(product,
+                dialectTypeMaps.get(SqlDialect.DatabaseProduct.UNKNOWN));
+        return typeMap.getOrDefault(cls, "VARCHAR(255)");
+    }
+
+    /**
+     * Extracts schema information from a POJO class or a Record.
+     *
+     * @param cls     POJO class
+     * @param product database product
+     * @return a list of schema fields
+     */
+    public static List<SchemaField> getSchema(Class<?> cls, 
SqlDialect.DatabaseProduct product) {
+        List<SchemaField> schema = new ArrayList<>();
+        if (cls == Record.class) {
+            // For Record.class without an instance, we can't derive 
names/types easily
+            // Users should use the instance-based getSchema or provide 
columnNames
+            return schema;
+        }
+
+        for (Field field : cls.getDeclaredFields()) {
+            if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
+                continue;
+            }
+            schema.add(new SchemaField(field.getName(), field.getType(), 
getSqlType(field.getType(), product)));
+        }
+        return schema;

Review Comment:
   `Class#getDeclaredFields()` does not guarantee a stable order across 
JVMs/compilers, but the derived schema order determines column ordering (and 
tests assume a specific order). To avoid nondeterministic schemas, sort fields 
deterministically (e.g., by name) or use a stable property-introspection 
approach.



##########
wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkTableSink.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.spark.operators;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.operators.TableSink;
+import org.apache.wayang.basic.util.SqlTypeUtils;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.spark.channels.RddChannel;
+import org.apache.wayang.spark.execution.SparkExecutor;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+public class SparkTableSink<T> extends TableSink<T> implements 
SparkExecutionOperator {
+
+    private SaveMode mode;
+
+    public SparkTableSink(Properties props, String mode, String tableName, 
String... columnNames) {
+        super(props, mode, tableName, columnNames);
+        this.setMode(mode);
+    }
+
+    public SparkTableSink(Properties props, String mode, String tableName, 
String[] columnNames, DataSetType<T> type) {
+        super(props, mode, tableName, columnNames, type);
+        this.setMode(mode);
+    }
+
+    public SparkTableSink(TableSink<T> that) {
+        super(that);
+        this.setMode(that.getMode());
+    }
+
+    @Override
+    public Tuple<Collection<ExecutionLineageNode>, 
Collection<ChannelInstance>> evaluate(
+            ChannelInstance[] inputs,
+            ChannelInstance[] outputs,
+            SparkExecutor sparkExecutor,
+            OptimizationContext.OperatorContext operatorContext) {
+        assert inputs.length == 1;
+        assert outputs.length == 0;
+
+        JavaRDD<T> recordRDD = ((RddChannel.Instance) inputs[0]).provideRdd();
+        Class<T> typeClass = (Class<T>) 
this.getType().getDataUnitType().getTypeClass();
+        SparkSession sparkSession = 
SparkSession.builder().sparkContext(sparkExecutor.sc.sc()).getOrCreate();
+        SQLContext sqlContext = sparkSession.sqlContext();
+
+        Dataset<Row> df;
+        if (typeClass == Record.class) {
+            // Records need manual schema handling
+            if (recordRDD.isEmpty()) {
+                return ExecutionOperator.modelEagerExecution(inputs, outputs, 
operatorContext);
+            }
+            Record first = (Record) recordRDD.first();
+
+            // Centralized Schema Derivation
+            List<SqlTypeUtils.SchemaField> schemaFields = 
SqlTypeUtils.getSchema(first,
+                    
SqlTypeUtils.detectProduct(this.getProperties().getProperty("url")),
+                    this.getColumnNames());
+
+            // Map Record to Row
+            JavaRDD<Row> rowRDD = recordRDD.map(rec -> 
RowFactory.create(((Record) rec).getValues()));
+
+            // Build Spark Schema
+            StructField[] fields = new StructField[schemaFields.size()];
+            for (int i = 0; i < schemaFields.size(); i++) {
+                SqlTypeUtils.SchemaField sf = schemaFields.get(i);
+                org.apache.spark.sql.types.DataType sparkType = 
getSparkDataType(sf.getJavaClass());
+                fields[i] = new StructField(sf.getName(), sparkType, true, 
Metadata.empty());
+            }
+
+            // Update column names in the operator if they were generated
+            String[] newColNames = 
schemaFields.stream().map(SqlTypeUtils.SchemaField::getName).toArray(String[]::new);
+            this.setColumnNames(newColNames);
+
+            df = sqlContext.createDataFrame(rowRDD, new StructType(fields));
+        } else {
+            // POJO Case: Let Spark handle it natively
+            df = sqlContext.createDataFrame(recordRDD, typeClass);
+            // If columnNames are provided, we should probably select/rename 
them,
+            // but usually createDataFrame(rdd, beanClass) maps fields to 
columns.
+            if (this.getColumnNames() != null && this.getColumnNames().length 
> 0) {
+                // Optionally filter or reorder columns to match 
this.getColumnNames()
+                // For now, Spark's native mapping is preferred.

Review Comment:
   In the POJO branch, `columnNames` passed to `TableSink` are silently ignored 
(the code comments mention it but no behavior is applied). This makes the API 
misleading because callers might expect renaming/reordering or subset 
selection. Either implement the mapping (select/rename) or validate and reject 
`columnNames` for POJO inputs with a clear error.
   ```suggestion
               // For POJOs, we currently do not support custom columnNames to 
avoid
               // ambiguous or misleading mappings. Fail fast if they are 
provided.
               String[] columnNames = this.getColumnNames();
               if (columnNames != null && columnNames.length > 0) {
                   throw new WayangException(
                           "columnNames are not supported for POJO inputs in 
SparkTableSink. " +
                           "Either omit columnNames or use Record inputs if you 
need custom column mapping.");
   ```



##########
wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkTableSink.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.spark.operators;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.operators.TableSink;
+import org.apache.wayang.basic.util.SqlTypeUtils;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.spark.channels.RddChannel;
+import org.apache.wayang.spark.execution.SparkExecutor;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+public class SparkTableSink<T> extends TableSink<T> implements 
SparkExecutionOperator {
+
+    private SaveMode mode;
+
+    public SparkTableSink(Properties props, String mode, String tableName, 
String... columnNames) {
+        super(props, mode, tableName, columnNames);
+        this.setMode(mode);
+    }
+
+    public SparkTableSink(Properties props, String mode, String tableName, 
String[] columnNames, DataSetType<T> type) {
+        super(props, mode, tableName, columnNames, type);
+        this.setMode(mode);
+    }
+
+    public SparkTableSink(TableSink<T> that) {
+        super(that);
+        this.setMode(that.getMode());
+    }
+
+    @Override
+    public Tuple<Collection<ExecutionLineageNode>, 
Collection<ChannelInstance>> evaluate(
+            ChannelInstance[] inputs,
+            ChannelInstance[] outputs,
+            SparkExecutor sparkExecutor,
+            OptimizationContext.OperatorContext operatorContext) {
+        assert inputs.length == 1;
+        assert outputs.length == 0;
+
+        JavaRDD<T> recordRDD = ((RddChannel.Instance) inputs[0]).provideRdd();
+        Class<T> typeClass = (Class<T>) 
this.getType().getDataUnitType().getTypeClass();
+        SparkSession sparkSession = 
SparkSession.builder().sparkContext(sparkExecutor.sc.sc()).getOrCreate();
+        SQLContext sqlContext = sparkSession.sqlContext();
+
+        Dataset<Row> df;
+        if (typeClass == Record.class) {
+            // Records need manual schema handling
+            if (recordRDD.isEmpty()) {
+                return ExecutionOperator.modelEagerExecution(inputs, outputs, 
operatorContext);
+            }
+            Record first = (Record) recordRDD.first();
+
+            // Centralized Schema Derivation
+            List<SqlTypeUtils.SchemaField> schemaFields = 
SqlTypeUtils.getSchema(first,
+                    
SqlTypeUtils.detectProduct(this.getProperties().getProperty("url")),
+                    this.getColumnNames());
+
+            // Map Record to Row
+            JavaRDD<Row> rowRDD = recordRDD.map(rec -> 
RowFactory.create(((Record) rec).getValues()));
+
+            // Build Spark Schema
+            StructField[] fields = new StructField[schemaFields.size()];
+            for (int i = 0; i < schemaFields.size(); i++) {
+                SqlTypeUtils.SchemaField sf = schemaFields.get(i);
+                org.apache.spark.sql.types.DataType sparkType = 
getSparkDataType(sf.getJavaClass());
+                fields[i] = new StructField(sf.getName(), sparkType, true, 
Metadata.empty());
+            }
+
+            // Update column names in the operator if they were generated
+            String[] newColNames = 
schemaFields.stream().map(SqlTypeUtils.SchemaField::getName).toArray(String[]::new);
+            this.setColumnNames(newColNames);
+

Review Comment:
   `this.setColumnNames(...)` mutates the operator based on runtime data (first 
record). Combined with `TableSink` copying `columnNames` by reference, this can 
leak schema decisions across copies/reuses. Prefer keeping inferred column 
names local to the evaluation (or ensure defensive copies in `TableSink` and 
avoid mutating shared state).
   ```suggestion
   
   ```



##########
wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTableSink.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.java.operators;
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.operators.TableSink;
+import org.apache.wayang.basic.util.SqlTypeUtils;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.ReflectionUtils;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.java.channels.CollectionChannel;
+import org.apache.wayang.java.channels.JavaChannelInstance;
+import org.apache.wayang.java.channels.StreamChannel;
+import org.apache.wayang.java.execution.JavaExecutor;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+public class JavaTableSink<T> extends TableSink<T> implements 
JavaExecutionOperator {
+
+    private void setRecordValue(PreparedStatement ps, int index, Object value) 
throws SQLException {
+        if (value == null) {
+            ps.setNull(index, java.sql.Types.NULL);
+        } else if (value instanceof Integer) {
+            ps.setInt(index, (Integer) value);
+        } else if (value instanceof Long) {
+            ps.setLong(index, (Long) value);
+        } else if (value instanceof Double) {
+            ps.setDouble(index, (Double) value);
+        } else if (value instanceof Float) {
+            ps.setFloat(index, (Float) value);
+        } else if (value instanceof Boolean) {
+            ps.setBoolean(index, (Boolean) value);
+        } else if (value instanceof java.sql.Date) {
+            ps.setDate(index, (java.sql.Date) value);
+        } else if (value instanceof java.sql.Timestamp) {
+            ps.setTimestamp(index, (java.sql.Timestamp) value);
+        } else {
+            ps.setString(index, value.toString());
+        }
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName) {
+        this(props, mode, tableName, null);
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName, 
String... columnNames) {
+        super(props, mode, tableName, columnNames);
+
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName, 
String[] columnNames, DataSetType<T> type) {
+        super(props, mode, tableName, columnNames, type);
+
+    }
+
+    public JavaTableSink(TableSink<T> that) {
+        super(that);
+    }
+
+    @Override
+    public Tuple<Collection<ExecutionLineageNode>, 
Collection<ChannelInstance>> evaluate(
+            ChannelInstance[] inputs,
+            ChannelInstance[] outputs,
+            JavaExecutor javaExecutor,
+            OptimizationContext.OperatorContext operatorContext) {
+        assert inputs.length == 1;
+        assert outputs.length == 0;
+        JavaChannelInstance input = (JavaChannelInstance) inputs[0];
+
+        // The stream is converted to an Iterator so that we can read the 
first element
+        // w/o consuming the entire stream.
+        Iterator<T> recordIterator = input.<T>provideStream().iterator();
+
+        if (!recordIterator.hasNext()) {
+            return ExecutionOperator.modelEagerExecution(inputs, outputs, 
operatorContext);
+        }
+
+        // We read the first element to derive the Record schema.
+        T firstElement = recordIterator.next();
+        Class<?> typeClass = this.getType().getDataUnitType().getTypeClass();
+
+        String url = this.getProperties().getProperty("url");
+        org.apache.calcite.sql.SqlDialect.DatabaseProduct product = 
SqlTypeUtils.detectProduct(url);
+
+        List<SqlTypeUtils.SchemaField> schemaFields;
+        if (typeClass != Record.class) {
+            schemaFields = SqlTypeUtils.getSchema(typeClass, product);
+        } else {
+            schemaFields = SqlTypeUtils.getSchema((Record) firstElement, 
product, this.getColumnNames());
+        }
+
+        String[] currentColumnNames = this.getColumnNames();
+        if (currentColumnNames == null || currentColumnNames.length == 0) {
+            currentColumnNames = new String[schemaFields.size()];
+            for (int i = 0; i < schemaFields.size(); i++) {
+                currentColumnNames[i] = schemaFields.get(i).getName();
+            }
+            this.setColumnNames(currentColumnNames);
+        }
+
+        String[] sqlTypes = new String[currentColumnNames.length];
+        for (int i = 0; i < currentColumnNames.length; i++) {
+            sqlTypes[i] = "VARCHAR(255)"; // Default
+            for (SqlTypeUtils.SchemaField field : schemaFields) {
+                if (field.getName().equals(currentColumnNames[i])) {
+                    sqlTypes[i] = field.getSqlType();
+                    break;
+                }
+            }
+        }
+
+        final String[] finalColumnNames = currentColumnNames;
+        final String[] finalSqlTypes = sqlTypes;
+
+        this.getProperties().setProperty("streamingBatchInsert", "True");
+
+        Connection conn;
+        try {
+            Class.forName(this.getProperties().getProperty("driver"));
+            conn = 
DriverManager.getConnection(this.getProperties().getProperty("url"), 
this.getProperties());
+            conn.setAutoCommit(false);
+
+            Statement stmt = conn.createStatement();
+
+            // Drop existing table if the mode is 'overwrite'.
+            if (this.getMode().equals("overwrite")) {
+                stmt.execute("DROP TABLE IF EXISTS " + this.getTableName());
+            }
+
+            // Create a new table if the specified table name does not exist 
yet.
+            StringBuilder sb = new StringBuilder();
+            sb.append("CREATE TABLE IF NOT EXISTS 
").append(this.getTableName()).append(" (");
+            String separator = "";
+            for (int i = 0; i < finalColumnNames.length; i++) {
+                
sb.append(separator).append("\"").append(finalColumnNames[i]).append("\" 
").append(finalSqlTypes[i]);
+                separator = ", ";
+            }
+            sb.append(")");
+            stmt.execute(sb.toString());
+
+            // Create a prepared statement to insert value from the 
recordIterator.
+            sb = new StringBuilder();
+            sb.append("INSERT INTO ").append(this.getTableName()).append(" (");
+            separator = "";
+            for (int i = 0; i < finalColumnNames.length; i++) {
+                
sb.append(separator).append("\"").append(finalColumnNames[i]).append("\"");
+                separator = ", ";
+            }
+            sb.append(") VALUES (");
+            separator = "";
+            for (int i = 0; i < finalColumnNames.length; i++) {
+                sb.append(separator).append("?");
+                separator = ", ";
+            }
+            sb.append(")");
+            PreparedStatement ps = conn.prepareStatement(sb.toString());
+
+            // The schema Record has to be pushed to the database too.
+            this.pushToStatement(ps, firstElement, typeClass, 
finalColumnNames);
+            ps.addBatch();
+
+            // Iterate through all remaining records and add them to the 
prepared statement
+            recordIterator.forEachRemaining(
+                    r -> {
+                        try {
+                            this.pushToStatement(ps, r, typeClass, 
finalColumnNames);
+                            ps.addBatch();
+                        } catch (SQLException e) {
+                            e.printStackTrace();
+                        }
+                    });
+
+            ps.executeBatch();
+            conn.commit();
+            conn.close();
+        } catch (ClassNotFoundException e) {
+            System.out.println("Please specify a correct database driver.");
+            e.printStackTrace();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }

Review Comment:
   Exceptions during driver loading / JDBC operations are currently handled 
with `System.out.println` + `printStackTrace()` and then the operator continues 
and returns success. This makes sink failures easy to miss. Please throw a 
`WayangException` (and ensure any open transaction is rolled back/connection 
closed) so failures fail the execution.



##########
wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTableSink.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.java.operators;
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.operators.TableSink;
+import org.apache.wayang.basic.util.SqlTypeUtils;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.ReflectionUtils;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.java.channels.CollectionChannel;
+import org.apache.wayang.java.channels.JavaChannelInstance;
+import org.apache.wayang.java.channels.StreamChannel;
+import org.apache.wayang.java.execution.JavaExecutor;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+public class JavaTableSink<T> extends TableSink<T> implements 
JavaExecutionOperator {
+
+    private void setRecordValue(PreparedStatement ps, int index, Object value) 
throws SQLException {
+        if (value == null) {
+            ps.setNull(index, java.sql.Types.NULL);
+        } else if (value instanceof Integer) {
+            ps.setInt(index, (Integer) value);
+        } else if (value instanceof Long) {
+            ps.setLong(index, (Long) value);
+        } else if (value instanceof Double) {
+            ps.setDouble(index, (Double) value);
+        } else if (value instanceof Float) {
+            ps.setFloat(index, (Float) value);
+        } else if (value instanceof Boolean) {
+            ps.setBoolean(index, (Boolean) value);
+        } else if (value instanceof java.sql.Date) {
+            ps.setDate(index, (java.sql.Date) value);
+        } else if (value instanceof java.sql.Timestamp) {
+            ps.setTimestamp(index, (java.sql.Timestamp) value);
+        } else {
+            ps.setString(index, value.toString());
+        }
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName) {
+        this(props, mode, tableName, null);
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName, 
String... columnNames) {
+        super(props, mode, tableName, columnNames);
+
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName, 
String[] columnNames, DataSetType<T> type) {
+        super(props, mode, tableName, columnNames, type);
+
+    }
+
+    public JavaTableSink(TableSink<T> that) {
+        super(that);
+    }
+
+    @Override
+    public Tuple<Collection<ExecutionLineageNode>, 
Collection<ChannelInstance>> evaluate(
+            ChannelInstance[] inputs,
+            ChannelInstance[] outputs,
+            JavaExecutor javaExecutor,
+            OptimizationContext.OperatorContext operatorContext) {
+        assert inputs.length == 1;
+        assert outputs.length == 0;
+        JavaChannelInstance input = (JavaChannelInstance) inputs[0];
+
+        // The stream is converted to an Iterator so that we can read the 
first element
+        // w/o consuming the entire stream.
+        Iterator<T> recordIterator = input.<T>provideStream().iterator();
+
+        if (!recordIterator.hasNext()) {
+            return ExecutionOperator.modelEagerExecution(inputs, outputs, 
operatorContext);
+        }
+
+        // We read the first element to derive the Record schema.
+        T firstElement = recordIterator.next();
+        Class<?> typeClass = this.getType().getDataUnitType().getTypeClass();
+
+        String url = this.getProperties().getProperty("url");
+        org.apache.calcite.sql.SqlDialect.DatabaseProduct product = 
SqlTypeUtils.detectProduct(url);
+
+        List<SqlTypeUtils.SchemaField> schemaFields;
+        if (typeClass != Record.class) {
+            schemaFields = SqlTypeUtils.getSchema(typeClass, product);
+        } else {
+            schemaFields = SqlTypeUtils.getSchema((Record) firstElement, 
product, this.getColumnNames());
+        }
+
+        String[] currentColumnNames = this.getColumnNames();
+        if (currentColumnNames == null || currentColumnNames.length == 0) {
+            currentColumnNames = new String[schemaFields.size()];
+            for (int i = 0; i < schemaFields.size(); i++) {
+                currentColumnNames[i] = schemaFields.get(i).getName();
+            }
+            this.setColumnNames(currentColumnNames);
+        }
+
+        String[] sqlTypes = new String[currentColumnNames.length];
+        for (int i = 0; i < currentColumnNames.length; i++) {
+            sqlTypes[i] = "VARCHAR(255)"; // Default
+            for (SqlTypeUtils.SchemaField field : schemaFields) {
+                if (field.getName().equals(currentColumnNames[i])) {
+                    sqlTypes[i] = field.getSqlType();
+                    break;
+                }
+            }
+        }
+
+        final String[] finalColumnNames = currentColumnNames;
+        final String[] finalSqlTypes = sqlTypes;
+
+        this.getProperties().setProperty("streamingBatchInsert", "True");
+
+        Connection conn;
+        try {
+            Class.forName(this.getProperties().getProperty("driver"));
+            conn = 
DriverManager.getConnection(this.getProperties().getProperty("url"), 
this.getProperties());
+            conn.setAutoCommit(false);
+
+            Statement stmt = conn.createStatement();
+
+            // Drop existing table if the mode is 'overwrite'.
+            if (this.getMode().equals("overwrite")) {
+                stmt.execute("DROP TABLE IF EXISTS " + this.getTableName());
+            }
+
+            // Create a new table if the specified table name does not exist 
yet.
+            StringBuilder sb = new StringBuilder();
+            sb.append("CREATE TABLE IF NOT EXISTS 
").append(this.getTableName()).append(" (");
+            String separator = "";
+            for (int i = 0; i < finalColumnNames.length; i++) {
+                
sb.append(separator).append("\"").append(finalColumnNames[i]).append("\" 
").append(finalSqlTypes[i]);
+                separator = ", ";
+            }
+            sb.append(")");
+            stmt.execute(sb.toString());
+
+            // Create a prepared statement to insert value from the 
recordIterator.
+            sb = new StringBuilder();
+            sb.append("INSERT INTO ").append(this.getTableName()).append(" (");
+            separator = "";
+            for (int i = 0; i < finalColumnNames.length; i++) {
+                
sb.append(separator).append("\"").append(finalColumnNames[i]).append("\"");
+                separator = ", ";
+            }
+            sb.append(") VALUES (");
+            separator = "";
+            for (int i = 0; i < finalColumnNames.length; i++) {
+                sb.append(separator).append("?");
+                separator = ", ";
+            }
+            sb.append(")");
+            PreparedStatement ps = conn.prepareStatement(sb.toString());
+
+            // The schema Record has to be pushed to the database too.
+            this.pushToStatement(ps, firstElement, typeClass, 
finalColumnNames);
+            ps.addBatch();
+
+            // Iterate through all remaining records and add them to the 
prepared statement
+            recordIterator.forEachRemaining(
+                    r -> {
+                        try {
+                            this.pushToStatement(ps, r, typeClass, 
finalColumnNames);
+                            ps.addBatch();
+                        } catch (SQLException e) {
+                            e.printStackTrace();
+                        }
+                    });
+
+            ps.executeBatch();
+            conn.commit();
+            conn.close();
+        } catch (ClassNotFoundException e) {
+            System.out.println("Please specify a correct database driver.");
+            e.printStackTrace();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+
+        return ExecutionOperator.modelEagerExecution(inputs, outputs, 
operatorContext);
+    }
+
+    private void pushToStatement(PreparedStatement ps, T element, Class<?> 
typeClass, String[] columnNames)
+            throws SQLException {
+        if (typeClass == Record.class) {
+            Record r = (Record) element;
+            for (int i = 0; i < columnNames.length; i++) {
+                setRecordValue(ps, i + 1, r.getField(i));
+            }
+        } else {

Review Comment:
   For `Record` inputs, the code assumes `columnNames.length <= record.size()` 
and indexes fields by position. If `columnNames` is longer than the `Record`, 
`r.getField(i)` will throw at runtime. Please validate the lengths early (and 
give a clear error) or derive `columnNames` from the record size when not 
provided.



##########
wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTableSink.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.java.operators;
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.operators.TableSink;
+import org.apache.wayang.basic.util.SqlTypeUtils;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.ReflectionUtils;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.java.channels.CollectionChannel;
+import org.apache.wayang.java.channels.JavaChannelInstance;
+import org.apache.wayang.java.channels.StreamChannel;
+import org.apache.wayang.java.execution.JavaExecutor;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+public class JavaTableSink<T> extends TableSink<T> implements 
JavaExecutionOperator {
+
+    private void setRecordValue(PreparedStatement ps, int index, Object value) 
throws SQLException {
+        if (value == null) {
+            ps.setNull(index, java.sql.Types.NULL);
+        } else if (value instanceof Integer) {
+            ps.setInt(index, (Integer) value);
+        } else if (value instanceof Long) {
+            ps.setLong(index, (Long) value);
+        } else if (value instanceof Double) {
+            ps.setDouble(index, (Double) value);
+        } else if (value instanceof Float) {
+            ps.setFloat(index, (Float) value);
+        } else if (value instanceof Boolean) {
+            ps.setBoolean(index, (Boolean) value);
+        } else if (value instanceof java.sql.Date) {
+            ps.setDate(index, (java.sql.Date) value);
+        } else if (value instanceof java.sql.Timestamp) {
+            ps.setTimestamp(index, (java.sql.Timestamp) value);
+        } else {
+            ps.setString(index, value.toString());
+        }
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName) {
+        this(props, mode, tableName, null);
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName, 
String... columnNames) {
+        super(props, mode, tableName, columnNames);
+
+    }
+
+    public JavaTableSink(Properties props, String mode, String tableName, 
String[] columnNames, DataSetType<T> type) {
+        super(props, mode, tableName, columnNames, type);
+
+    }
+
+    public JavaTableSink(TableSink<T> that) {
+        super(that);
+    }
+
+    @Override
+    public Tuple<Collection<ExecutionLineageNode>, 
Collection<ChannelInstance>> evaluate(
+            ChannelInstance[] inputs,
+            ChannelInstance[] outputs,
+            JavaExecutor javaExecutor,
+            OptimizationContext.OperatorContext operatorContext) {
+        assert inputs.length == 1;
+        assert outputs.length == 0;
+        JavaChannelInstance input = (JavaChannelInstance) inputs[0];
+
+        // The stream is converted to an Iterator so that we can read the 
first element
+        // w/o consuming the entire stream.
+        Iterator<T> recordIterator = input.<T>provideStream().iterator();
+
+        if (!recordIterator.hasNext()) {
+            return ExecutionOperator.modelEagerExecution(inputs, outputs, 
operatorContext);
+        }
+
+        // We read the first element to derive the Record schema.
+        T firstElement = recordIterator.next();
+        Class<?> typeClass = this.getType().getDataUnitType().getTypeClass();
+
+        String url = this.getProperties().getProperty("url");
+        org.apache.calcite.sql.SqlDialect.DatabaseProduct product = 
SqlTypeUtils.detectProduct(url);
+
+        List<SqlTypeUtils.SchemaField> schemaFields;
+        if (typeClass != Record.class) {
+            schemaFields = SqlTypeUtils.getSchema(typeClass, product);
+        } else {
+            schemaFields = SqlTypeUtils.getSchema((Record) firstElement, 
product, this.getColumnNames());
+        }
+
+        String[] currentColumnNames = this.getColumnNames();
+        if (currentColumnNames == null || currentColumnNames.length == 0) {
+            currentColumnNames = new String[schemaFields.size()];
+            for (int i = 0; i < schemaFields.size(); i++) {
+                currentColumnNames[i] = schemaFields.get(i).getName();
+            }
+            this.setColumnNames(currentColumnNames);
+        }
+
+        String[] sqlTypes = new String[currentColumnNames.length];
+        for (int i = 0; i < currentColumnNames.length; i++) {
+            sqlTypes[i] = "VARCHAR(255)"; // Default
+            for (SqlTypeUtils.SchemaField field : schemaFields) {
+                if (field.getName().equals(currentColumnNames[i])) {
+                    sqlTypes[i] = field.getSqlType();
+                    break;
+                }
+            }
+        }
+
+        final String[] finalColumnNames = currentColumnNames;
+        final String[] finalSqlTypes = sqlTypes;
+
+        this.getProperties().setProperty("streamingBatchInsert", "True");
+
+        Connection conn;
+        try {
+            Class.forName(this.getProperties().getProperty("driver"));
+            conn = 
DriverManager.getConnection(this.getProperties().getProperty("url"), 
this.getProperties());
+            conn.setAutoCommit(false);
+
+            Statement stmt = conn.createStatement();
+
+            // Drop existing table if the mode is 'overwrite'.
+            if (this.getMode().equals("overwrite")) {
+                stmt.execute("DROP TABLE IF EXISTS " + this.getTableName());
+            }
+
+            // Create a new table if the specified table name does not exist 
yet.
+            StringBuilder sb = new StringBuilder();
+            sb.append("CREATE TABLE IF NOT EXISTS 
").append(this.getTableName()).append(" (");
+            String separator = "";
+            for (int i = 0; i < finalColumnNames.length; i++) {
+                
sb.append(separator).append("\"").append(finalColumnNames[i]).append("\" 
").append(finalSqlTypes[i]);
+                separator = ", ";
+            }
+            sb.append(")");
+            stmt.execute(sb.toString());
+
+            // Create a prepared statement to insert value from the 
recordIterator.
+            sb = new StringBuilder();
+            sb.append("INSERT INTO ").append(this.getTableName()).append(" (");
+            separator = "";
+            for (int i = 0; i < finalColumnNames.length; i++) {
+                
sb.append(separator).append("\"").append(finalColumnNames[i]).append("\"");
+                separator = ", ";
+            }
+            sb.append(") VALUES (");
+            separator = "";
+            for (int i = 0; i < finalColumnNames.length; i++) {
+                sb.append(separator).append("?");
+                separator = ", ";
+            }
+            sb.append(")");
+            PreparedStatement ps = conn.prepareStatement(sb.toString());
+
+            // The schema Record has to be pushed to the database too.
+            this.pushToStatement(ps, firstElement, typeClass, 
finalColumnNames);
+            ps.addBatch();
+
+            // Iterate through all remaining records and add them to the 
prepared statement
+            recordIterator.forEachRemaining(
+                    r -> {
+                        try {
+                            this.pushToStatement(ps, r, typeClass, 
finalColumnNames);
+                            ps.addBatch();
+                        } catch (SQLException e) {
+                            e.printStackTrace();
+                        }
+                    });

Review Comment:
   `recordIterator.forEachRemaining` swallows `SQLException` via 
`printStackTrace()` and keeps building the batch, which can lead to 
partial/incorrect writes without failing the operator. Propagate the failure 
(e.g., wrap in a runtime exception), then abort/rollback and surface it as a 
`WayangException` so the job fails deterministically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to