Copilot commented on code in PR #665: URL: https://github.com/apache/wayang/pull/665#discussion_r2872006799
########## wayang-platforms/wayang-spark/src/test/java/org/apache/wayang/spark/operators/SparkTableSinkTest.java: ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.spark.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.api.Job; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.OutputSlot; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.spark.channels.RddChannel; +import org.apache.wayang.spark.platform.SparkPlatform; Review Comment: This test has unused imports (`OptimizationContext`, `OutputSlot`, `SparkPlatform`), which will fail the repository Checkstyle run (google_checks.xml includes `UnusedImports`). Please remove the unused imports (and any now-unneeded mocking setup) to keep the build green. ```suggestion import org.apache.wayang.core.platform.ChannelInstance; import org.apache.wayang.core.types.DataSetType; import org.apache.wayang.spark.channels.RddChannel; ``` ########## wayang-commons/wayang-basic/pom.xml: ########## @@ -120,6 +120,11 @@ <version>20231013</version> </dependency> + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-core</artifactId> + <version>${calcite.version}</version> + </dependency> <dependency> Review Comment: Adding `calcite-core` to `wayang-basic` just to access `SqlDialect.DatabaseProduct` significantly increases the dependency footprint of a low-level module and can introduce version-conflict pressure across the build. Consider replacing this with a small internal enum (or moving dialect-related utilities into a module that already depends on Calcite) to keep `wayang-basic` lightweight. ```suggestion ``` ########## wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkTableSink.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.spark.operators; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.basic.util.SqlTypeUtils; +import org.apache.wayang.core.api.exception.WayangException; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.spark.channels.RddChannel; +import org.apache.wayang.spark.execution.SparkExecutor; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +public class SparkTableSink<T> extends TableSink<T> implements SparkExecutionOperator { + + private SaveMode mode; + + public SparkTableSink(Properties props, String mode, String tableName, String... columnNames) { + super(props, mode, tableName, columnNames); + this.setMode(mode); + } + + public SparkTableSink(Properties props, String mode, String tableName, String[] columnNames, DataSetType<T> type) { + super(props, mode, tableName, columnNames, type); + this.setMode(mode); + } + + public SparkTableSink(TableSink<T> that) { + super(that); + this.setMode(that.getMode()); + } + + @Override + public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + SparkExecutor sparkExecutor, + OptimizationContext.OperatorContext operatorContext) { + assert inputs.length == 1; + assert outputs.length == 0; + + JavaRDD<T> recordRDD = ((RddChannel.Instance) inputs[0]).provideRdd(); + Class<T> typeClass = (Class<T>) this.getType().getDataUnitType().getTypeClass(); + SparkSession sparkSession = SparkSession.builder().sparkContext(sparkExecutor.sc.sc()).getOrCreate(); + SQLContext sqlContext = sparkSession.sqlContext(); + + Dataset<Row> df; + if (typeClass == Record.class) { + // Records need manual schema handling + if (recordRDD.isEmpty()) { + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + Record first = (Record) recordRDD.first(); Review Comment: For `Record` inputs, this does two Spark actions (`isEmpty()` and then `first()`), which triggers two jobs and can be expensive. Prefer a single `take(1)` (or similar) to both detect emptiness and obtain a sample record for schema inference. ```suggestion List<T> sample = recordRDD.take(1); if (sample.isEmpty()) { return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); } Record first = (Record) sample.get(0); ``` ########## wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/util/SqlTypeUtils.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.basic.util; + +import org.apache.calcite.sql.SqlDialect; +import org.apache.wayang.basic.data.Record; + +import java.lang.reflect.Field; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Utility for mapping Java types to SQL types across different dialects. + */ +public class SqlTypeUtils { + + private static final Map<SqlDialect.DatabaseProduct, Map<Class<?>, String>> dialectTypeMaps = new HashMap<>(); + + static { + // Default mappings (Standard SQL) + Map<Class<?>, String> defaultMap = new HashMap<>(); + defaultMap.put(Integer.class, "INT"); + defaultMap.put(int.class, "INT"); + defaultMap.put(Long.class, "BIGINT"); + defaultMap.put(long.class, "BIGINT"); + defaultMap.put(Double.class, "DOUBLE"); + defaultMap.put(double.class, "DOUBLE"); + defaultMap.put(Float.class, "FLOAT"); + defaultMap.put(float.class, "FLOAT"); + defaultMap.put(Boolean.class, "BOOLEAN"); + defaultMap.put(boolean.class, "BOOLEAN"); + defaultMap.put(String.class, "VARCHAR(255)"); + defaultMap.put(Date.class, "DATE"); + defaultMap.put(LocalDate.class, "DATE"); + defaultMap.put(Timestamp.class, "TIMESTAMP"); + defaultMap.put(LocalDateTime.class, "TIMESTAMP"); + + dialectTypeMaps.put(SqlDialect.DatabaseProduct.UNKNOWN, defaultMap); + + // PostgreSQL Overrides + Map<Class<?>, String> pgMap = new HashMap<>(defaultMap); + pgMap.put(Double.class, "DOUBLE PRECISION"); + pgMap.put(double.class, "DOUBLE PRECISION"); + dialectTypeMaps.put(SqlDialect.DatabaseProduct.POSTGRESQL, pgMap); + + // Add more dialects here as needed (MySQL, Oracle, etc.) + } + + /** + * Detects the database product from a JDBC URL. + * + * @param url JDBC URL + * @return detected DatabaseProduct + */ + public static SqlDialect.DatabaseProduct detectProduct(String url) { + if (url == null) + return SqlDialect.DatabaseProduct.UNKNOWN; + String lowerUrl = url.toLowerCase(); + if (lowerUrl.contains("postgresql") || lowerUrl.contains("postgres")) + return SqlDialect.DatabaseProduct.POSTGRESQL; + if (lowerUrl.contains("mysql")) + return SqlDialect.DatabaseProduct.MYSQL; + if (lowerUrl.contains("oracle")) + return SqlDialect.DatabaseProduct.ORACLE; + if (lowerUrl.contains("sqlite")) { + try { + return SqlDialect.DatabaseProduct.valueOf("SQLITE"); + } catch (Exception e) { + return SqlDialect.DatabaseProduct.UNKNOWN; + } + } + if (lowerUrl.contains("h2")) + return SqlDialect.DatabaseProduct.H2; + if (lowerUrl.contains("derby")) + return SqlDialect.DatabaseProduct.DERBY; + if (lowerUrl.contains("mssql") || lowerUrl.contains("sqlserver")) + return SqlDialect.DatabaseProduct.MSSQL; + return SqlDialect.DatabaseProduct.UNKNOWN; + } + + /** + * Returns the SQL type for a given Java class and database product. + * + * @param cls Java class + * @param product database product + * @return SQL type string + */ + public static String getSqlType(Class<?> cls, SqlDialect.DatabaseProduct product) { + Map<Class<?>, String> typeMap = dialectTypeMaps.getOrDefault(product, + dialectTypeMaps.get(SqlDialect.DatabaseProduct.UNKNOWN)); + return typeMap.getOrDefault(cls, "VARCHAR(255)"); + } + + /** + * Extracts schema information from a POJO class or a Record. + * + * @param cls POJO class + * @param product database product + * @return a list of schema fields + */ + public static List<SchemaField> getSchema(Class<?> cls, SqlDialect.DatabaseProduct product) { + List<SchemaField> schema = new ArrayList<>(); + if (cls == Record.class) { + // For Record.class without an instance, we can't derive names/types easily + // Users should use the instance-based getSchema or provide columnNames + return schema; + } + + for (Field field : cls.getDeclaredFields()) { + if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) { + continue; + } + schema.add(new SchemaField(field.getName(), field.getType(), getSqlType(field.getType(), product))); + } + return schema; Review Comment: POJO schema derivation uses `getDeclaredFields()`, but value extraction uses `ReflectionUtils.getProperty(...)`, which requires matching `getX()` getters. This will fail for private fields without getters (even though they are included in the schema) and can also include fields that should not be persisted. Consider deriving schema from JavaBeans properties/getters (or at least filtering to fields that have corresponding getters). ########## wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTableSink.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.basic.util.SqlTypeUtils; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.ReflectionUtils; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.java.channels.CollectionChannel; +import org.apache.wayang.java.channels.JavaChannelInstance; +import org.apache.wayang.java.channels.StreamChannel; +import org.apache.wayang.java.execution.JavaExecutor; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +public class JavaTableSink<T> extends TableSink<T> implements JavaExecutionOperator { + + private void setRecordValue(PreparedStatement ps, int index, Object value) throws SQLException { + if (value == null) { + ps.setNull(index, java.sql.Types.NULL); + } else if (value instanceof Integer) { + ps.setInt(index, (Integer) value); + } else if (value instanceof Long) { + ps.setLong(index, (Long) value); + } else if (value instanceof Double) { + ps.setDouble(index, (Double) value); + } else if (value instanceof Float) { + ps.setFloat(index, (Float) value); + } else if (value instanceof Boolean) { + ps.setBoolean(index, (Boolean) value); + } else if (value instanceof java.sql.Date) { + ps.setDate(index, (java.sql.Date) value); + } else if (value instanceof java.sql.Timestamp) { + ps.setTimestamp(index, (java.sql.Timestamp) value); + } else { + ps.setString(index, value.toString()); + } + } + + public JavaTableSink(Properties props, String mode, String tableName) { + this(props, mode, tableName, null); + } + + public JavaTableSink(Properties props, String mode, String tableName, String... columnNames) { + super(props, mode, tableName, columnNames); + + } + + public JavaTableSink(Properties props, String mode, String tableName, String[] columnNames, DataSetType<T> type) { + super(props, mode, tableName, columnNames, type); + + } + + public JavaTableSink(TableSink<T> that) { + super(that); + } + + @Override + public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + JavaExecutor javaExecutor, + OptimizationContext.OperatorContext operatorContext) { + assert inputs.length == 1; + assert outputs.length == 0; + JavaChannelInstance input = (JavaChannelInstance) inputs[0]; + + // The stream is converted to an Iterator so that we can read the first element + // w/o consuming the entire stream. + Iterator<T> recordIterator = input.<T>provideStream().iterator(); + + if (!recordIterator.hasNext()) { + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + + // We read the first element to derive the Record schema. + T firstElement = recordIterator.next(); + Class<?> typeClass = this.getType().getDataUnitType().getTypeClass(); + + String url = this.getProperties().getProperty("url"); + org.apache.calcite.sql.SqlDialect.DatabaseProduct product = SqlTypeUtils.detectProduct(url); + + List<SqlTypeUtils.SchemaField> schemaFields; + if (typeClass != Record.class) { + schemaFields = SqlTypeUtils.getSchema(typeClass, product); + } else { + schemaFields = SqlTypeUtils.getSchema((Record) firstElement, product, this.getColumnNames()); + } + + String[] currentColumnNames = this.getColumnNames(); + if (currentColumnNames == null || currentColumnNames.length == 0) { + currentColumnNames = new String[schemaFields.size()]; + for (int i = 0; i < schemaFields.size(); i++) { + currentColumnNames[i] = schemaFields.get(i).getName(); + } + this.setColumnNames(currentColumnNames); + } + + String[] sqlTypes = new String[currentColumnNames.length]; + for (int i = 0; i < currentColumnNames.length; i++) { + sqlTypes[i] = "VARCHAR(255)"; // Default + for (SqlTypeUtils.SchemaField field : schemaFields) { + if (field.getName().equals(currentColumnNames[i])) { + sqlTypes[i] = field.getSqlType(); + break; + } + } + } + + final String[] finalColumnNames = currentColumnNames; + final String[] finalSqlTypes = sqlTypes; + + this.getProperties().setProperty("streamingBatchInsert", "True"); + + Connection conn; + try { + Class.forName(this.getProperties().getProperty("driver")); + conn = DriverManager.getConnection(this.getProperties().getProperty("url"), this.getProperties()); + conn.setAutoCommit(false); + + Statement stmt = conn.createStatement(); + + // Drop existing table if the mode is 'overwrite'. + if (this.getMode().equals("overwrite")) { + stmt.execute("DROP TABLE IF EXISTS " + this.getTableName()); + } + + // Create a new table if the specified table name does not exist yet. + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE IF NOT EXISTS ").append(this.getTableName()).append(" ("); + String separator = ""; + for (int i = 0; i < finalColumnNames.length; i++) { + sb.append(separator).append("\"").append(finalColumnNames[i]).append("\" ").append(finalSqlTypes[i]); + separator = ", "; + } + sb.append(")"); + stmt.execute(sb.toString()); + + // Create a prepared statement to insert value from the recordIterator. + sb = new StringBuilder(); + sb.append("INSERT INTO ").append(this.getTableName()).append(" ("); + separator = ""; + for (int i = 0; i < finalColumnNames.length; i++) { + sb.append(separator).append("\"").append(finalColumnNames[i]).append("\""); + separator = ", "; + } + sb.append(") VALUES ("); + separator = ""; + for (int i = 0; i < finalColumnNames.length; i++) { + sb.append(separator).append("?"); + separator = ", "; + } + sb.append(")"); + PreparedStatement ps = conn.prepareStatement(sb.toString()); + + // The schema Record has to be pushed to the database too. + this.pushToStatement(ps, firstElement, typeClass, finalColumnNames); + ps.addBatch(); + + // Iterate through all remaining records and add them to the prepared statement + recordIterator.forEachRemaining( + r -> { + try { + this.pushToStatement(ps, r, typeClass, finalColumnNames); + ps.addBatch(); + } catch (SQLException e) { + e.printStackTrace(); + } + }); + + ps.executeBatch(); + conn.commit(); + conn.close(); + } catch (ClassNotFoundException e) { + System.out.println("Please specify a correct database driver."); + e.printStackTrace(); + } catch (SQLException e) { + e.printStackTrace(); Review Comment: JDBC resources (`Connection`, `Statement`, `PreparedStatement`) are not managed with try-with-resources, so an exception before `conn.close()` will leak connections/statements and may leave an open transaction. Use try-with-resources (or a `finally`) for `Connection`, `Statement`, and `PreparedStatement`, and rollback on failure when `autoCommit=false`. ```suggestion try (PreparedStatement ps = conn.prepareStatement(sb.toString())) { // The schema Record has to be pushed to the database too. this.pushToStatement(ps, firstElement, typeClass, finalColumnNames); ps.addBatch(); // Iterate through all remaining records and add them to the prepared statement recordIterator.forEachRemaining( r -> { try { this.pushToStatement(ps, r, typeClass, finalColumnNames); ps.addBatch(); } catch (SQLException e) { e.printStackTrace(); } }); ps.executeBatch(); conn.commit(); } } catch (ClassNotFoundException e) { System.out.println("Please specify a correct database driver."); e.printStackTrace(); } catch (SQLException e) { try { if (conn != null && !conn.getAutoCommit()) { conn.rollback(); } } catch (SQLException rollbackException) { rollbackException.printStackTrace(); } e.printStackTrace(); } finally { if (conn != null) { try { conn.close(); } catch (SQLException closeException) { closeException.printStackTrace(); } } ``` ########## wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkTableSink.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.spark.operators; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.basic.util.SqlTypeUtils; +import org.apache.wayang.core.api.exception.WayangException; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.spark.channels.RddChannel; +import org.apache.wayang.spark.execution.SparkExecutor; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +public class SparkTableSink<T> extends TableSink<T> implements SparkExecutionOperator { + + private SaveMode mode; + + public SparkTableSink(Properties props, String mode, String tableName, String... columnNames) { + super(props, mode, tableName, columnNames); + this.setMode(mode); + } + + public SparkTableSink(Properties props, String mode, String tableName, String[] columnNames, DataSetType<T> type) { + super(props, mode, tableName, columnNames, type); + this.setMode(mode); + } + + public SparkTableSink(TableSink<T> that) { + super(that); + this.setMode(that.getMode()); + } + + @Override + public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + SparkExecutor sparkExecutor, + OptimizationContext.OperatorContext operatorContext) { + assert inputs.length == 1; + assert outputs.length == 0; + + JavaRDD<T> recordRDD = ((RddChannel.Instance) inputs[0]).provideRdd(); + Class<T> typeClass = (Class<T>) this.getType().getDataUnitType().getTypeClass(); + SparkSession sparkSession = SparkSession.builder().sparkContext(sparkExecutor.sc.sc()).getOrCreate(); + SQLContext sqlContext = sparkSession.sqlContext(); + + Dataset<Row> df; + if (typeClass == Record.class) { + // Records need manual schema handling + if (recordRDD.isEmpty()) { + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + Record first = (Record) recordRDD.first(); + + // Centralized Schema Derivation + List<SqlTypeUtils.SchemaField> schemaFields = SqlTypeUtils.getSchema(first, + SqlTypeUtils.detectProduct(this.getProperties().getProperty("url")), + this.getColumnNames()); + + // Map Record to Row + JavaRDD<Row> rowRDD = recordRDD.map(rec -> RowFactory.create(((Record) rec).getValues())); + + // Build Spark Schema + StructField[] fields = new StructField[schemaFields.size()]; + for (int i = 0; i < schemaFields.size(); i++) { + SqlTypeUtils.SchemaField sf = schemaFields.get(i); + org.apache.spark.sql.types.DataType sparkType = getSparkDataType(sf.getJavaClass()); + fields[i] = new StructField(sf.getName(), sparkType, true, Metadata.empty()); + } + + // Update column names in the operator if they were generated + String[] newColNames = schemaFields.stream().map(SqlTypeUtils.SchemaField::getName).toArray(String[]::new); + this.setColumnNames(newColNames); + + df = sqlContext.createDataFrame(rowRDD, new StructType(fields)); + } else { + // POJO Case: Let Spark handle it natively + df = sqlContext.createDataFrame(recordRDD, typeClass); + // If columnNames are provided, we should probably select/rename them, + // but usually createDataFrame(rdd, beanClass) maps fields to columns. + if (this.getColumnNames() != null && this.getColumnNames().length > 0) { + // Optionally filter or reorder columns to match this.getColumnNames() + // For now, Spark's native mapping is preferred. + } + } + + this.getProperties().setProperty("batchSize", "250000"); + df.write() + .mode(this.mode) + .jdbc(this.getProperties().getProperty("url"), this.getTableName(), this.getProperties()); Review Comment: `this.getProperties().setProperty("batchSize", "250000")` mutates the user-provided JDBC `Properties` and unconditionally overrides any caller-supplied batch size. Prefer not mutating shared config, or only applying a default when the option is absent (and ideally via a copy of the `Properties` passed to `.jdbc(...)`). ```suggestion // Create a defensive copy of the JDBC properties to avoid mutating caller-provided config. Properties jdbcProps = new Properties(); jdbcProps.putAll(this.getProperties()); // Apply a default batchSize only if the caller did not specify one. if (!jdbcProps.containsKey("batchSize")) { jdbcProps.setProperty("batchSize", "250000"); } df.write() .mode(this.mode) .jdbc(jdbcProps.getProperty("url"), this.getTableName(), jdbcProps); ``` ########## wayang-platforms/wayang-java/pom.xml: ########## @@ -78,7 +78,19 @@ <artifactId>log4j-slf4j-impl</artifactId> <version>2.20.0</version> </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>42.7.2</version> + <scope>test</scope> + </dependency> Review Comment: `org.postgresql:postgresql` is added as a test dependency in the Java platform module, but the new tests use H2 and there is no usage of the PostgreSQL driver in this module’s test sources. If it’s not needed for other tests, removing it will reduce build time/dependency surface; otherwise, consider adding a test that actually covers PostgreSQL-specific behavior. ```suggestion ``` ########## wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTableSink.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.basic.util.SqlTypeUtils; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.ReflectionUtils; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.java.channels.CollectionChannel; +import org.apache.wayang.java.channels.JavaChannelInstance; +import org.apache.wayang.java.channels.StreamChannel; +import org.apache.wayang.java.execution.JavaExecutor; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +public class JavaTableSink<T> extends TableSink<T> implements JavaExecutionOperator { + + private void setRecordValue(PreparedStatement ps, int index, Object value) throws SQLException { + if (value == null) { + ps.setNull(index, java.sql.Types.NULL); + } else if (value instanceof Integer) { + ps.setInt(index, (Integer) value); + } else if (value instanceof Long) { + ps.setLong(index, (Long) value); + } else if (value instanceof Double) { + ps.setDouble(index, (Double) value); + } else if (value instanceof Float) { + ps.setFloat(index, (Float) value); + } else if (value instanceof Boolean) { + ps.setBoolean(index, (Boolean) value); + } else if (value instanceof java.sql.Date) { + ps.setDate(index, (java.sql.Date) value); + } else if (value instanceof java.sql.Timestamp) { + ps.setTimestamp(index, (java.sql.Timestamp) value); + } else { + ps.setString(index, value.toString()); + } + } + + public JavaTableSink(Properties props, String mode, String tableName) { + this(props, mode, tableName, null); + } + + public JavaTableSink(Properties props, String mode, String tableName, String... columnNames) { + super(props, mode, tableName, columnNames); + + } + + public JavaTableSink(Properties props, String mode, String tableName, String[] columnNames, DataSetType<T> type) { + super(props, mode, tableName, columnNames, type); + + } + + public JavaTableSink(TableSink<T> that) { + super(that); + } + + @Override + public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + JavaExecutor javaExecutor, + OptimizationContext.OperatorContext operatorContext) { + assert inputs.length == 1; + assert outputs.length == 0; + JavaChannelInstance input = (JavaChannelInstance) inputs[0]; + + // The stream is converted to an Iterator so that we can read the first element + // w/o consuming the entire stream. + Iterator<T> recordIterator = input.<T>provideStream().iterator(); + + if (!recordIterator.hasNext()) { + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + + // We read the first element to derive the Record schema. + T firstElement = recordIterator.next(); + Class<?> typeClass = this.getType().getDataUnitType().getTypeClass(); + + String url = this.getProperties().getProperty("url"); + org.apache.calcite.sql.SqlDialect.DatabaseProduct product = SqlTypeUtils.detectProduct(url); + + List<SqlTypeUtils.SchemaField> schemaFields; + if (typeClass != Record.class) { + schemaFields = SqlTypeUtils.getSchema(typeClass, product); + } else { + schemaFields = SqlTypeUtils.getSchema((Record) firstElement, product, this.getColumnNames()); + } + + String[] currentColumnNames = this.getColumnNames(); + if (currentColumnNames == null || currentColumnNames.length == 0) { + currentColumnNames = new String[schemaFields.size()]; + for (int i = 0; i < schemaFields.size(); i++) { + currentColumnNames[i] = schemaFields.get(i).getName(); + } + this.setColumnNames(currentColumnNames); + } + + String[] sqlTypes = new String[currentColumnNames.length]; + for (int i = 0; i < currentColumnNames.length; i++) { + sqlTypes[i] = "VARCHAR(255)"; // Default + for (SqlTypeUtils.SchemaField field : schemaFields) { + if (field.getName().equals(currentColumnNames[i])) { + sqlTypes[i] = field.getSqlType(); + break; + } + } + } + + final String[] finalColumnNames = currentColumnNames; + final String[] finalSqlTypes = sqlTypes; + + this.getProperties().setProperty("streamingBatchInsert", "True"); + + Connection conn; + try { + Class.forName(this.getProperties().getProperty("driver")); + conn = DriverManager.getConnection(this.getProperties().getProperty("url"), this.getProperties()); + conn.setAutoCommit(false); + + Statement stmt = conn.createStatement(); + + // Drop existing table if the mode is 'overwrite'. + if (this.getMode().equals("overwrite")) { + stmt.execute("DROP TABLE IF EXISTS " + this.getTableName()); + } + + // Create a new table if the specified table name does not exist yet. + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE IF NOT EXISTS ").append(this.getTableName()).append(" ("); + String separator = ""; + for (int i = 0; i < finalColumnNames.length; i++) { + sb.append(separator).append("\"").append(finalColumnNames[i]).append("\" ").append(finalSqlTypes[i]); + separator = ", "; + } + sb.append(")"); + stmt.execute(sb.toString()); + + // Create a prepared statement to insert value from the recordIterator. + sb = new StringBuilder(); + sb.append("INSERT INTO ").append(this.getTableName()).append(" ("); + separator = ""; Review Comment: `tableName` is concatenated directly into SQL statements (DROP/CREATE/INSERT). Since it is user-provided, this is vulnerable to SQL injection and also breaks on reserved words/special characters. Please validate identifiers (e.g., strict `[A-Za-z0-9_]+` allowlist) and/or use proper dialect-specific quoting for the table name as well. ########## wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkTableSink.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.spark.operators; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.basic.util.SqlTypeUtils; +import org.apache.wayang.core.api.exception.WayangException; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.spark.channels.RddChannel; +import org.apache.wayang.spark.execution.SparkExecutor; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +public class SparkTableSink<T> extends TableSink<T> implements SparkExecutionOperator { + + private SaveMode mode; + + public SparkTableSink(Properties props, String mode, String tableName, String... columnNames) { + super(props, mode, tableName, columnNames); + this.setMode(mode); + } + + public SparkTableSink(Properties props, String mode, String tableName, String[] columnNames, DataSetType<T> type) { + super(props, mode, tableName, columnNames, type); + this.setMode(mode); + } + + public SparkTableSink(TableSink<T> that) { + super(that); + this.setMode(that.getMode()); + } + + @Override + public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + SparkExecutor sparkExecutor, + OptimizationContext.OperatorContext operatorContext) { + assert inputs.length == 1; + assert outputs.length == 0; + + JavaRDD<T> recordRDD = ((RddChannel.Instance) inputs[0]).provideRdd(); + Class<T> typeClass = (Class<T>) this.getType().getDataUnitType().getTypeClass(); + SparkSession sparkSession = SparkSession.builder().sparkContext(sparkExecutor.sc.sc()).getOrCreate(); + SQLContext sqlContext = sparkSession.sqlContext(); + + Dataset<Row> df; + if (typeClass == Record.class) { + // Records need manual schema handling + if (recordRDD.isEmpty()) { + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + Record first = (Record) recordRDD.first(); + + // Centralized Schema Derivation + List<SqlTypeUtils.SchemaField> schemaFields = SqlTypeUtils.getSchema(first, + SqlTypeUtils.detectProduct(this.getProperties().getProperty("url")), + this.getColumnNames()); + + // Map Record to Row + JavaRDD<Row> rowRDD = recordRDD.map(rec -> RowFactory.create(((Record) rec).getValues())); + + // Build Spark Schema + StructField[] fields = new StructField[schemaFields.size()]; + for (int i = 0; i < schemaFields.size(); i++) { + SqlTypeUtils.SchemaField sf = schemaFields.get(i); + org.apache.spark.sql.types.DataType sparkType = getSparkDataType(sf.getJavaClass()); + fields[i] = new StructField(sf.getName(), sparkType, true, Metadata.empty()); + } + + // Update column names in the operator if they were generated + String[] newColNames = schemaFields.stream().map(SqlTypeUtils.SchemaField::getName).toArray(String[]::new); + this.setColumnNames(newColNames); + + df = sqlContext.createDataFrame(rowRDD, new StructType(fields)); + } else { + // POJO Case: Let Spark handle it natively + df = sqlContext.createDataFrame(recordRDD, typeClass); + // If columnNames are provided, we should probably select/rename them, + // but usually createDataFrame(rdd, beanClass) maps fields to columns. + if (this.getColumnNames() != null && this.getColumnNames().length > 0) { + // Optionally filter or reorder columns to match this.getColumnNames() + // For now, Spark's native mapping is preferred. + } + } + + this.getProperties().setProperty("batchSize", "250000"); + df.write() + .mode(this.mode) + .jdbc(this.getProperties().getProperty("url"), this.getTableName(), this.getProperties()); + + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + Review Comment: `DataFrameWriter.jdbc(url, tableName, ...)` treats the `tableName`/`dbtable` option as a SQL fragment (Spark allows subqueries like `(select ...) t`), so passing an unchecked user-provided table name can enable SQL injection or unexpected writes. Consider validating/quoting `tableName` (and documenting allowed forms) before passing it to Spark JDBC. ```suggestion .jdbc(this.getProperties().getProperty("url"), sanitizeTableName(this.getTableName()), this.getProperties()); return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); } private static final java.util.regex.Pattern TABLE_NAME_PATTERN = java.util.regex.Pattern.compile("[A-Za-z0-9_\\.]+"); /** * Validate the table name before passing it to Spark's JDBC writer. * Spark treats the `tableName` argument as a SQL fragment, so we restrict it * to a conservative set of characters to avoid SQL injection or unintended writes. */ private static String sanitizeTableName(String tableName) { if (tableName == null || !TABLE_NAME_PATTERN.matcher(tableName).matches()) { throw new WayangException("Illegal table name for Spark JDBC sink: " + tableName); } return tableName; } ``` ########## wayang-platforms/wayang-spark/pom.xml: ########## @@ -121,5 +121,18 @@ <version>4.8</version> </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>42.7.2</version> + <scope>compile</scope> Review Comment: `org.postgresql:postgresql` is added with `compile` scope in the Spark platform module. This makes the Spark platform depend on a specific JDBC driver even though the sink is intended to be generic JDBC; it also increases artifact size and can cause driver/version conflicts for downstream users. Consider removing it or changing it to `test`/`runtime`/`optional`, letting applications provide their chosen driver. ```suggestion <scope>test</scope> ``` ########## wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/TableSink.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.basic.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.core.plan.wayangplan.UnarySink; +import org.apache.wayang.core.types.DataSetType; + +import java.util.Properties; + +/** + * {@link UnarySink} that writes Records to a database table. + */ + +public class TableSink<T> extends UnarySink<T> { + private final String tableName; + + private String[] columnNames; + + private final Properties props; + + private String mode; + + /** + * Creates a new instance. + * + * @param props database connection properties + * @param mode write mode + * @param tableName name of the table to be written + * @param columnNames names of the columns in the tables + */ + public TableSink(Properties props, String mode, String tableName, String... columnNames) { + this(props, mode, tableName, columnNames, (DataSetType<T>) DataSetType.createDefault(Record.class)); + } + + public TableSink(Properties props, String mode, String tableName, String[] columnNames, DataSetType<T> type) { + super(type); + this.tableName = tableName; + this.columnNames = columnNames; + this.props = props; + this.mode = mode; + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public TableSink(TableSink<T> that) { + super(that); + this.tableName = that.getTableName(); + this.columnNames = that.getColumnNames(); + this.props = that.getProperties(); + this.mode = that.getMode(); + } Review Comment: The copy constructor keeps references to the original `Properties` and `columnNames` array (`this.props = that.getProperties()` / `this.columnNames = that.getColumnNames()`). Because these are mutable and also exposed via getters, later mutations (e.g., SparkTableSink setting JDBC options or regenerating column names) can unexpectedly affect other operator instances. Consider defensively copying (`new Properties(props)`, `Arrays.copyOf(...)`) in constructors/getters. ########## wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/util/SqlTypeUtils.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.basic.util; + +import org.apache.calcite.sql.SqlDialect; +import org.apache.wayang.basic.data.Record; + +import java.lang.reflect.Field; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Utility for mapping Java types to SQL types across different dialects. + */ +public class SqlTypeUtils { + + private static final Map<SqlDialect.DatabaseProduct, Map<Class<?>, String>> dialectTypeMaps = new HashMap<>(); + + static { + // Default mappings (Standard SQL) + Map<Class<?>, String> defaultMap = new HashMap<>(); + defaultMap.put(Integer.class, "INT"); + defaultMap.put(int.class, "INT"); + defaultMap.put(Long.class, "BIGINT"); + defaultMap.put(long.class, "BIGINT"); + defaultMap.put(Double.class, "DOUBLE"); + defaultMap.put(double.class, "DOUBLE"); + defaultMap.put(Float.class, "FLOAT"); + defaultMap.put(float.class, "FLOAT"); + defaultMap.put(Boolean.class, "BOOLEAN"); + defaultMap.put(boolean.class, "BOOLEAN"); + defaultMap.put(String.class, "VARCHAR(255)"); + defaultMap.put(Date.class, "DATE"); + defaultMap.put(LocalDate.class, "DATE"); + defaultMap.put(Timestamp.class, "TIMESTAMP"); + defaultMap.put(LocalDateTime.class, "TIMESTAMP"); + + dialectTypeMaps.put(SqlDialect.DatabaseProduct.UNKNOWN, defaultMap); + + // PostgreSQL Overrides + Map<Class<?>, String> pgMap = new HashMap<>(defaultMap); + pgMap.put(Double.class, "DOUBLE PRECISION"); + pgMap.put(double.class, "DOUBLE PRECISION"); + dialectTypeMaps.put(SqlDialect.DatabaseProduct.POSTGRESQL, pgMap); + + // Add more dialects here as needed (MySQL, Oracle, etc.) + } + + /** + * Detects the database product from a JDBC URL. + * + * @param url JDBC URL + * @return detected DatabaseProduct + */ + public static SqlDialect.DatabaseProduct detectProduct(String url) { + if (url == null) + return SqlDialect.DatabaseProduct.UNKNOWN; + String lowerUrl = url.toLowerCase(); + if (lowerUrl.contains("postgresql") || lowerUrl.contains("postgres")) + return SqlDialect.DatabaseProduct.POSTGRESQL; + if (lowerUrl.contains("mysql")) + return SqlDialect.DatabaseProduct.MYSQL; + if (lowerUrl.contains("oracle")) + return SqlDialect.DatabaseProduct.ORACLE; + if (lowerUrl.contains("sqlite")) { + try { + return SqlDialect.DatabaseProduct.valueOf("SQLITE"); + } catch (Exception e) { + return SqlDialect.DatabaseProduct.UNKNOWN; + } + } + if (lowerUrl.contains("h2")) + return SqlDialect.DatabaseProduct.H2; + if (lowerUrl.contains("derby")) + return SqlDialect.DatabaseProduct.DERBY; + if (lowerUrl.contains("mssql") || lowerUrl.contains("sqlserver")) + return SqlDialect.DatabaseProduct.MSSQL; + return SqlDialect.DatabaseProduct.UNKNOWN; + } + + /** + * Returns the SQL type for a given Java class and database product. + * + * @param cls Java class + * @param product database product + * @return SQL type string + */ + public static String getSqlType(Class<?> cls, SqlDialect.DatabaseProduct product) { + Map<Class<?>, String> typeMap = dialectTypeMaps.getOrDefault(product, + dialectTypeMaps.get(SqlDialect.DatabaseProduct.UNKNOWN)); + return typeMap.getOrDefault(cls, "VARCHAR(255)"); + } + + /** + * Extracts schema information from a POJO class or a Record. + * + * @param cls POJO class + * @param product database product + * @return a list of schema fields + */ + public static List<SchemaField> getSchema(Class<?> cls, SqlDialect.DatabaseProduct product) { + List<SchemaField> schema = new ArrayList<>(); + if (cls == Record.class) { + // For Record.class without an instance, we can't derive names/types easily + // Users should use the instance-based getSchema or provide columnNames + return schema; + } + + for (Field field : cls.getDeclaredFields()) { + if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) { + continue; + } + schema.add(new SchemaField(field.getName(), field.getType(), getSqlType(field.getType(), product))); + } + return schema; Review Comment: `Class#getDeclaredFields()` does not guarantee a stable order across JVMs/compilers, but the derived schema order determines column ordering (and tests assume a specific order). To avoid nondeterministic schemas, sort fields deterministically (e.g., by name) or use a stable property-introspection approach. ########## wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkTableSink.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.spark.operators; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.basic.util.SqlTypeUtils; +import org.apache.wayang.core.api.exception.WayangException; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.spark.channels.RddChannel; +import org.apache.wayang.spark.execution.SparkExecutor; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +public class SparkTableSink<T> extends TableSink<T> implements SparkExecutionOperator { + + private SaveMode mode; + + public SparkTableSink(Properties props, String mode, String tableName, String... columnNames) { + super(props, mode, tableName, columnNames); + this.setMode(mode); + } + + public SparkTableSink(Properties props, String mode, String tableName, String[] columnNames, DataSetType<T> type) { + super(props, mode, tableName, columnNames, type); + this.setMode(mode); + } + + public SparkTableSink(TableSink<T> that) { + super(that); + this.setMode(that.getMode()); + } + + @Override + public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + SparkExecutor sparkExecutor, + OptimizationContext.OperatorContext operatorContext) { + assert inputs.length == 1; + assert outputs.length == 0; + + JavaRDD<T> recordRDD = ((RddChannel.Instance) inputs[0]).provideRdd(); + Class<T> typeClass = (Class<T>) this.getType().getDataUnitType().getTypeClass(); + SparkSession sparkSession = SparkSession.builder().sparkContext(sparkExecutor.sc.sc()).getOrCreate(); + SQLContext sqlContext = sparkSession.sqlContext(); + + Dataset<Row> df; + if (typeClass == Record.class) { + // Records need manual schema handling + if (recordRDD.isEmpty()) { + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + Record first = (Record) recordRDD.first(); + + // Centralized Schema Derivation + List<SqlTypeUtils.SchemaField> schemaFields = SqlTypeUtils.getSchema(first, + SqlTypeUtils.detectProduct(this.getProperties().getProperty("url")), + this.getColumnNames()); + + // Map Record to Row + JavaRDD<Row> rowRDD = recordRDD.map(rec -> RowFactory.create(((Record) rec).getValues())); + + // Build Spark Schema + StructField[] fields = new StructField[schemaFields.size()]; + for (int i = 0; i < schemaFields.size(); i++) { + SqlTypeUtils.SchemaField sf = schemaFields.get(i); + org.apache.spark.sql.types.DataType sparkType = getSparkDataType(sf.getJavaClass()); + fields[i] = new StructField(sf.getName(), sparkType, true, Metadata.empty()); + } + + // Update column names in the operator if they were generated + String[] newColNames = schemaFields.stream().map(SqlTypeUtils.SchemaField::getName).toArray(String[]::new); + this.setColumnNames(newColNames); + + df = sqlContext.createDataFrame(rowRDD, new StructType(fields)); + } else { + // POJO Case: Let Spark handle it natively + df = sqlContext.createDataFrame(recordRDD, typeClass); + // If columnNames are provided, we should probably select/rename them, + // but usually createDataFrame(rdd, beanClass) maps fields to columns. + if (this.getColumnNames() != null && this.getColumnNames().length > 0) { + // Optionally filter or reorder columns to match this.getColumnNames() + // For now, Spark's native mapping is preferred. Review Comment: In the POJO branch, `columnNames` passed to `TableSink` are silently ignored (the code comments mention it but no behavior is applied). This makes the API misleading because callers might expect renaming/reordering or subset selection. Either implement the mapping (select/rename) or validate and reject `columnNames` for POJO inputs with a clear error. ```suggestion // For POJOs, we currently do not support custom columnNames to avoid // ambiguous or misleading mappings. Fail fast if they are provided. String[] columnNames = this.getColumnNames(); if (columnNames != null && columnNames.length > 0) { throw new WayangException( "columnNames are not supported for POJO inputs in SparkTableSink. " + "Either omit columnNames or use Record inputs if you need custom column mapping."); ``` ########## wayang-platforms/wayang-spark/src/main/java/org/apache/wayang/spark/operators/SparkTableSink.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.spark.operators; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.basic.util.SqlTypeUtils; +import org.apache.wayang.core.api.exception.WayangException; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.spark.channels.RddChannel; +import org.apache.wayang.spark.execution.SparkExecutor; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +public class SparkTableSink<T> extends TableSink<T> implements SparkExecutionOperator { + + private SaveMode mode; + + public SparkTableSink(Properties props, String mode, String tableName, String... columnNames) { + super(props, mode, tableName, columnNames); + this.setMode(mode); + } + + public SparkTableSink(Properties props, String mode, String tableName, String[] columnNames, DataSetType<T> type) { + super(props, mode, tableName, columnNames, type); + this.setMode(mode); + } + + public SparkTableSink(TableSink<T> that) { + super(that); + this.setMode(that.getMode()); + } + + @Override + public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + SparkExecutor sparkExecutor, + OptimizationContext.OperatorContext operatorContext) { + assert inputs.length == 1; + assert outputs.length == 0; + + JavaRDD<T> recordRDD = ((RddChannel.Instance) inputs[0]).provideRdd(); + Class<T> typeClass = (Class<T>) this.getType().getDataUnitType().getTypeClass(); + SparkSession sparkSession = SparkSession.builder().sparkContext(sparkExecutor.sc.sc()).getOrCreate(); + SQLContext sqlContext = sparkSession.sqlContext(); + + Dataset<Row> df; + if (typeClass == Record.class) { + // Records need manual schema handling + if (recordRDD.isEmpty()) { + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + Record first = (Record) recordRDD.first(); + + // Centralized Schema Derivation + List<SqlTypeUtils.SchemaField> schemaFields = SqlTypeUtils.getSchema(first, + SqlTypeUtils.detectProduct(this.getProperties().getProperty("url")), + this.getColumnNames()); + + // Map Record to Row + JavaRDD<Row> rowRDD = recordRDD.map(rec -> RowFactory.create(((Record) rec).getValues())); + + // Build Spark Schema + StructField[] fields = new StructField[schemaFields.size()]; + for (int i = 0; i < schemaFields.size(); i++) { + SqlTypeUtils.SchemaField sf = schemaFields.get(i); + org.apache.spark.sql.types.DataType sparkType = getSparkDataType(sf.getJavaClass()); + fields[i] = new StructField(sf.getName(), sparkType, true, Metadata.empty()); + } + + // Update column names in the operator if they were generated + String[] newColNames = schemaFields.stream().map(SqlTypeUtils.SchemaField::getName).toArray(String[]::new); + this.setColumnNames(newColNames); + Review Comment: `this.setColumnNames(...)` mutates the operator based on runtime data (first record). Combined with `TableSink` copying `columnNames` by reference, this can leak schema decisions across copies/reuses. Prefer keeping inferred column names local to the evaluation (or ensure defensive copies in `TableSink` and avoid mutating shared state). ```suggestion ``` ########## wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTableSink.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.basic.util.SqlTypeUtils; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.ReflectionUtils; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.java.channels.CollectionChannel; +import org.apache.wayang.java.channels.JavaChannelInstance; +import org.apache.wayang.java.channels.StreamChannel; +import org.apache.wayang.java.execution.JavaExecutor; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +public class JavaTableSink<T> extends TableSink<T> implements JavaExecutionOperator { + + private void setRecordValue(PreparedStatement ps, int index, Object value) throws SQLException { + if (value == null) { + ps.setNull(index, java.sql.Types.NULL); + } else if (value instanceof Integer) { + ps.setInt(index, (Integer) value); + } else if (value instanceof Long) { + ps.setLong(index, (Long) value); + } else if (value instanceof Double) { + ps.setDouble(index, (Double) value); + } else if (value instanceof Float) { + ps.setFloat(index, (Float) value); + } else if (value instanceof Boolean) { + ps.setBoolean(index, (Boolean) value); + } else if (value instanceof java.sql.Date) { + ps.setDate(index, (java.sql.Date) value); + } else if (value instanceof java.sql.Timestamp) { + ps.setTimestamp(index, (java.sql.Timestamp) value); + } else { + ps.setString(index, value.toString()); + } + } + + public JavaTableSink(Properties props, String mode, String tableName) { + this(props, mode, tableName, null); + } + + public JavaTableSink(Properties props, String mode, String tableName, String... columnNames) { + super(props, mode, tableName, columnNames); + + } + + public JavaTableSink(Properties props, String mode, String tableName, String[] columnNames, DataSetType<T> type) { + super(props, mode, tableName, columnNames, type); + + } + + public JavaTableSink(TableSink<T> that) { + super(that); + } + + @Override + public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + JavaExecutor javaExecutor, + OptimizationContext.OperatorContext operatorContext) { + assert inputs.length == 1; + assert outputs.length == 0; + JavaChannelInstance input = (JavaChannelInstance) inputs[0]; + + // The stream is converted to an Iterator so that we can read the first element + // w/o consuming the entire stream. + Iterator<T> recordIterator = input.<T>provideStream().iterator(); + + if (!recordIterator.hasNext()) { + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + + // We read the first element to derive the Record schema. + T firstElement = recordIterator.next(); + Class<?> typeClass = this.getType().getDataUnitType().getTypeClass(); + + String url = this.getProperties().getProperty("url"); + org.apache.calcite.sql.SqlDialect.DatabaseProduct product = SqlTypeUtils.detectProduct(url); + + List<SqlTypeUtils.SchemaField> schemaFields; + if (typeClass != Record.class) { + schemaFields = SqlTypeUtils.getSchema(typeClass, product); + } else { + schemaFields = SqlTypeUtils.getSchema((Record) firstElement, product, this.getColumnNames()); + } + + String[] currentColumnNames = this.getColumnNames(); + if (currentColumnNames == null || currentColumnNames.length == 0) { + currentColumnNames = new String[schemaFields.size()]; + for (int i = 0; i < schemaFields.size(); i++) { + currentColumnNames[i] = schemaFields.get(i).getName(); + } + this.setColumnNames(currentColumnNames); + } + + String[] sqlTypes = new String[currentColumnNames.length]; + for (int i = 0; i < currentColumnNames.length; i++) { + sqlTypes[i] = "VARCHAR(255)"; // Default + for (SqlTypeUtils.SchemaField field : schemaFields) { + if (field.getName().equals(currentColumnNames[i])) { + sqlTypes[i] = field.getSqlType(); + break; + } + } + } + + final String[] finalColumnNames = currentColumnNames; + final String[] finalSqlTypes = sqlTypes; + + this.getProperties().setProperty("streamingBatchInsert", "True"); + + Connection conn; + try { + Class.forName(this.getProperties().getProperty("driver")); + conn = DriverManager.getConnection(this.getProperties().getProperty("url"), this.getProperties()); + conn.setAutoCommit(false); + + Statement stmt = conn.createStatement(); + + // Drop existing table if the mode is 'overwrite'. + if (this.getMode().equals("overwrite")) { + stmt.execute("DROP TABLE IF EXISTS " + this.getTableName()); + } + + // Create a new table if the specified table name does not exist yet. + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE IF NOT EXISTS ").append(this.getTableName()).append(" ("); + String separator = ""; + for (int i = 0; i < finalColumnNames.length; i++) { + sb.append(separator).append("\"").append(finalColumnNames[i]).append("\" ").append(finalSqlTypes[i]); + separator = ", "; + } + sb.append(")"); + stmt.execute(sb.toString()); + + // Create a prepared statement to insert value from the recordIterator. + sb = new StringBuilder(); + sb.append("INSERT INTO ").append(this.getTableName()).append(" ("); + separator = ""; + for (int i = 0; i < finalColumnNames.length; i++) { + sb.append(separator).append("\"").append(finalColumnNames[i]).append("\""); + separator = ", "; + } + sb.append(") VALUES ("); + separator = ""; + for (int i = 0; i < finalColumnNames.length; i++) { + sb.append(separator).append("?"); + separator = ", "; + } + sb.append(")"); + PreparedStatement ps = conn.prepareStatement(sb.toString()); + + // The schema Record has to be pushed to the database too. + this.pushToStatement(ps, firstElement, typeClass, finalColumnNames); + ps.addBatch(); + + // Iterate through all remaining records and add them to the prepared statement + recordIterator.forEachRemaining( + r -> { + try { + this.pushToStatement(ps, r, typeClass, finalColumnNames); + ps.addBatch(); + } catch (SQLException e) { + e.printStackTrace(); + } + }); + + ps.executeBatch(); + conn.commit(); + conn.close(); + } catch (ClassNotFoundException e) { + System.out.println("Please specify a correct database driver."); + e.printStackTrace(); + } catch (SQLException e) { + e.printStackTrace(); + } Review Comment: Exceptions during driver loading / JDBC operations are currently handled with `System.out.println` + `printStackTrace()` and then the operator continues and returns success. This makes sink failures easy to miss. Please throw a `WayangException` (and ensure any open transaction is rolled back/connection closed) so failures fail the execution. ########## wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTableSink.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.basic.util.SqlTypeUtils; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.ReflectionUtils; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.java.channels.CollectionChannel; +import org.apache.wayang.java.channels.JavaChannelInstance; +import org.apache.wayang.java.channels.StreamChannel; +import org.apache.wayang.java.execution.JavaExecutor; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +public class JavaTableSink<T> extends TableSink<T> implements JavaExecutionOperator { + + private void setRecordValue(PreparedStatement ps, int index, Object value) throws SQLException { + if (value == null) { + ps.setNull(index, java.sql.Types.NULL); + } else if (value instanceof Integer) { + ps.setInt(index, (Integer) value); + } else if (value instanceof Long) { + ps.setLong(index, (Long) value); + } else if (value instanceof Double) { + ps.setDouble(index, (Double) value); + } else if (value instanceof Float) { + ps.setFloat(index, (Float) value); + } else if (value instanceof Boolean) { + ps.setBoolean(index, (Boolean) value); + } else if (value instanceof java.sql.Date) { + ps.setDate(index, (java.sql.Date) value); + } else if (value instanceof java.sql.Timestamp) { + ps.setTimestamp(index, (java.sql.Timestamp) value); + } else { + ps.setString(index, value.toString()); + } + } + + public JavaTableSink(Properties props, String mode, String tableName) { + this(props, mode, tableName, null); + } + + public JavaTableSink(Properties props, String mode, String tableName, String... columnNames) { + super(props, mode, tableName, columnNames); + + } + + public JavaTableSink(Properties props, String mode, String tableName, String[] columnNames, DataSetType<T> type) { + super(props, mode, tableName, columnNames, type); + + } + + public JavaTableSink(TableSink<T> that) { + super(that); + } + + @Override + public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + JavaExecutor javaExecutor, + OptimizationContext.OperatorContext operatorContext) { + assert inputs.length == 1; + assert outputs.length == 0; + JavaChannelInstance input = (JavaChannelInstance) inputs[0]; + + // The stream is converted to an Iterator so that we can read the first element + // w/o consuming the entire stream. + Iterator<T> recordIterator = input.<T>provideStream().iterator(); + + if (!recordIterator.hasNext()) { + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + + // We read the first element to derive the Record schema. + T firstElement = recordIterator.next(); + Class<?> typeClass = this.getType().getDataUnitType().getTypeClass(); + + String url = this.getProperties().getProperty("url"); + org.apache.calcite.sql.SqlDialect.DatabaseProduct product = SqlTypeUtils.detectProduct(url); + + List<SqlTypeUtils.SchemaField> schemaFields; + if (typeClass != Record.class) { + schemaFields = SqlTypeUtils.getSchema(typeClass, product); + } else { + schemaFields = SqlTypeUtils.getSchema((Record) firstElement, product, this.getColumnNames()); + } + + String[] currentColumnNames = this.getColumnNames(); + if (currentColumnNames == null || currentColumnNames.length == 0) { + currentColumnNames = new String[schemaFields.size()]; + for (int i = 0; i < schemaFields.size(); i++) { + currentColumnNames[i] = schemaFields.get(i).getName(); + } + this.setColumnNames(currentColumnNames); + } + + String[] sqlTypes = new String[currentColumnNames.length]; + for (int i = 0; i < currentColumnNames.length; i++) { + sqlTypes[i] = "VARCHAR(255)"; // Default + for (SqlTypeUtils.SchemaField field : schemaFields) { + if (field.getName().equals(currentColumnNames[i])) { + sqlTypes[i] = field.getSqlType(); + break; + } + } + } + + final String[] finalColumnNames = currentColumnNames; + final String[] finalSqlTypes = sqlTypes; + + this.getProperties().setProperty("streamingBatchInsert", "True"); + + Connection conn; + try { + Class.forName(this.getProperties().getProperty("driver")); + conn = DriverManager.getConnection(this.getProperties().getProperty("url"), this.getProperties()); + conn.setAutoCommit(false); + + Statement stmt = conn.createStatement(); + + // Drop existing table if the mode is 'overwrite'. + if (this.getMode().equals("overwrite")) { + stmt.execute("DROP TABLE IF EXISTS " + this.getTableName()); + } + + // Create a new table if the specified table name does not exist yet. + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE IF NOT EXISTS ").append(this.getTableName()).append(" ("); + String separator = ""; + for (int i = 0; i < finalColumnNames.length; i++) { + sb.append(separator).append("\"").append(finalColumnNames[i]).append("\" ").append(finalSqlTypes[i]); + separator = ", "; + } + sb.append(")"); + stmt.execute(sb.toString()); + + // Create a prepared statement to insert value from the recordIterator. + sb = new StringBuilder(); + sb.append("INSERT INTO ").append(this.getTableName()).append(" ("); + separator = ""; + for (int i = 0; i < finalColumnNames.length; i++) { + sb.append(separator).append("\"").append(finalColumnNames[i]).append("\""); + separator = ", "; + } + sb.append(") VALUES ("); + separator = ""; + for (int i = 0; i < finalColumnNames.length; i++) { + sb.append(separator).append("?"); + separator = ", "; + } + sb.append(")"); + PreparedStatement ps = conn.prepareStatement(sb.toString()); + + // The schema Record has to be pushed to the database too. + this.pushToStatement(ps, firstElement, typeClass, finalColumnNames); + ps.addBatch(); + + // Iterate through all remaining records and add them to the prepared statement + recordIterator.forEachRemaining( + r -> { + try { + this.pushToStatement(ps, r, typeClass, finalColumnNames); + ps.addBatch(); + } catch (SQLException e) { + e.printStackTrace(); + } + }); + + ps.executeBatch(); + conn.commit(); + conn.close(); + } catch (ClassNotFoundException e) { + System.out.println("Please specify a correct database driver."); + e.printStackTrace(); + } catch (SQLException e) { + e.printStackTrace(); + } + + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + + private void pushToStatement(PreparedStatement ps, T element, Class<?> typeClass, String[] columnNames) + throws SQLException { + if (typeClass == Record.class) { + Record r = (Record) element; + for (int i = 0; i < columnNames.length; i++) { + setRecordValue(ps, i + 1, r.getField(i)); + } + } else { Review Comment: For `Record` inputs, the code assumes `columnNames.length <= record.size()` and indexes fields by position. If `columnNames` is longer than the `Record`, `r.getField(i)` will throw at runtime. Please validate the lengths early (and give a clear error) or derive `columnNames` from the record size when not provided. ########## wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTableSink.java: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.operators; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.TableSink; +import org.apache.wayang.basic.util.SqlTypeUtils; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.ReflectionUtils; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.java.channels.CollectionChannel; +import org.apache.wayang.java.channels.JavaChannelInstance; +import org.apache.wayang.java.channels.StreamChannel; +import org.apache.wayang.java.execution.JavaExecutor; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +public class JavaTableSink<T> extends TableSink<T> implements JavaExecutionOperator { + + private void setRecordValue(PreparedStatement ps, int index, Object value) throws SQLException { + if (value == null) { + ps.setNull(index, java.sql.Types.NULL); + } else if (value instanceof Integer) { + ps.setInt(index, (Integer) value); + } else if (value instanceof Long) { + ps.setLong(index, (Long) value); + } else if (value instanceof Double) { + ps.setDouble(index, (Double) value); + } else if (value instanceof Float) { + ps.setFloat(index, (Float) value); + } else if (value instanceof Boolean) { + ps.setBoolean(index, (Boolean) value); + } else if (value instanceof java.sql.Date) { + ps.setDate(index, (java.sql.Date) value); + } else if (value instanceof java.sql.Timestamp) { + ps.setTimestamp(index, (java.sql.Timestamp) value); + } else { + ps.setString(index, value.toString()); + } + } + + public JavaTableSink(Properties props, String mode, String tableName) { + this(props, mode, tableName, null); + } + + public JavaTableSink(Properties props, String mode, String tableName, String... columnNames) { + super(props, mode, tableName, columnNames); + + } + + public JavaTableSink(Properties props, String mode, String tableName, String[] columnNames, DataSetType<T> type) { + super(props, mode, tableName, columnNames, type); + + } + + public JavaTableSink(TableSink<T> that) { + super(that); + } + + @Override + public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + JavaExecutor javaExecutor, + OptimizationContext.OperatorContext operatorContext) { + assert inputs.length == 1; + assert outputs.length == 0; + JavaChannelInstance input = (JavaChannelInstance) inputs[0]; + + // The stream is converted to an Iterator so that we can read the first element + // w/o consuming the entire stream. + Iterator<T> recordIterator = input.<T>provideStream().iterator(); + + if (!recordIterator.hasNext()) { + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + + // We read the first element to derive the Record schema. + T firstElement = recordIterator.next(); + Class<?> typeClass = this.getType().getDataUnitType().getTypeClass(); + + String url = this.getProperties().getProperty("url"); + org.apache.calcite.sql.SqlDialect.DatabaseProduct product = SqlTypeUtils.detectProduct(url); + + List<SqlTypeUtils.SchemaField> schemaFields; + if (typeClass != Record.class) { + schemaFields = SqlTypeUtils.getSchema(typeClass, product); + } else { + schemaFields = SqlTypeUtils.getSchema((Record) firstElement, product, this.getColumnNames()); + } + + String[] currentColumnNames = this.getColumnNames(); + if (currentColumnNames == null || currentColumnNames.length == 0) { + currentColumnNames = new String[schemaFields.size()]; + for (int i = 0; i < schemaFields.size(); i++) { + currentColumnNames[i] = schemaFields.get(i).getName(); + } + this.setColumnNames(currentColumnNames); + } + + String[] sqlTypes = new String[currentColumnNames.length]; + for (int i = 0; i < currentColumnNames.length; i++) { + sqlTypes[i] = "VARCHAR(255)"; // Default + for (SqlTypeUtils.SchemaField field : schemaFields) { + if (field.getName().equals(currentColumnNames[i])) { + sqlTypes[i] = field.getSqlType(); + break; + } + } + } + + final String[] finalColumnNames = currentColumnNames; + final String[] finalSqlTypes = sqlTypes; + + this.getProperties().setProperty("streamingBatchInsert", "True"); + + Connection conn; + try { + Class.forName(this.getProperties().getProperty("driver")); + conn = DriverManager.getConnection(this.getProperties().getProperty("url"), this.getProperties()); + conn.setAutoCommit(false); + + Statement stmt = conn.createStatement(); + + // Drop existing table if the mode is 'overwrite'. + if (this.getMode().equals("overwrite")) { + stmt.execute("DROP TABLE IF EXISTS " + this.getTableName()); + } + + // Create a new table if the specified table name does not exist yet. + StringBuilder sb = new StringBuilder(); + sb.append("CREATE TABLE IF NOT EXISTS ").append(this.getTableName()).append(" ("); + String separator = ""; + for (int i = 0; i < finalColumnNames.length; i++) { + sb.append(separator).append("\"").append(finalColumnNames[i]).append("\" ").append(finalSqlTypes[i]); + separator = ", "; + } + sb.append(")"); + stmt.execute(sb.toString()); + + // Create a prepared statement to insert value from the recordIterator. + sb = new StringBuilder(); + sb.append("INSERT INTO ").append(this.getTableName()).append(" ("); + separator = ""; + for (int i = 0; i < finalColumnNames.length; i++) { + sb.append(separator).append("\"").append(finalColumnNames[i]).append("\""); + separator = ", "; + } + sb.append(") VALUES ("); + separator = ""; + for (int i = 0; i < finalColumnNames.length; i++) { + sb.append(separator).append("?"); + separator = ", "; + } + sb.append(")"); + PreparedStatement ps = conn.prepareStatement(sb.toString()); + + // The schema Record has to be pushed to the database too. + this.pushToStatement(ps, firstElement, typeClass, finalColumnNames); + ps.addBatch(); + + // Iterate through all remaining records and add them to the prepared statement + recordIterator.forEachRemaining( + r -> { + try { + this.pushToStatement(ps, r, typeClass, finalColumnNames); + ps.addBatch(); + } catch (SQLException e) { + e.printStackTrace(); + } + }); Review Comment: `recordIterator.forEachRemaining` swallows `SQLException` via `printStackTrace()` and keeps building the batch, which can lead to partial/incorrect writes without failing the operator. Propagate the failure (e.g., wrap in a runtime exception), then abort/rollback and surface it as a `WayangException` so the job fails deterministically. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
