yx91490 commented on a change in pull request #1188: URL: https://github.com/apache/incubator-seatunnel/pull/1188#discussion_r800131606
########## File path: seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.spark; + +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.env.RuntimeEnv; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.streaming.Seconds; +import org.apache.spark.streaming.StreamingContext; + +public class SparkEnvironment implements RuntimeEnv { + + private final long sparkStreamingDuration = 5; Review comment: Idea warned with `Field can be converted to a local variable`, or can add a `static` modifier. besides, better to renamed to `DEFAULT_SPARK_STREAMING_DURATION` ########## File path: seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/BaseSparkSink.java ########## @@ -14,23 +14,38 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.seatunnel.spark -import org.apache.seatunnel.apis.BaseSink -import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory} -import org.apache.spark.sql.{Dataset, Row} +package org.apache.seatunnel.spark; + +import org.apache.seatunnel.apis.BaseSink; +import org.apache.seatunnel.common.config.CheckResult; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; /** * a base interface indicates a sink plugin running on Spark. */ -trait BaseSparkSink[OUT] extends BaseSink[SparkEnvironment] { +public abstract class BaseSparkSink<OUT> implements BaseSink<SparkEnvironment> { + + protected Config config = ConfigFactory.empty(); - protected var config: Config = ConfigFactory.empty() + @Override + public void setConfig(Config config) { + this.config = config; + } - override def setConfig(config: Config): Unit = this.config = config + @Override + public Config getConfig() { + return config; + } - override def getConfig: Config = config + public abstract CheckResult checkConfig(); Review comment: seems no diffrerent with parent class's method, can be removed. ########## File path: seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.spark.batch; + +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.config.ConfigRuntimeException; +import org.apache.seatunnel.env.Execution; +import org.apache.seatunnel.spark.BaseSparkSink; +import org.apache.seatunnel.spark.BaseSparkSource; +import org.apache.seatunnel.spark.BaseSparkTransform; +import org.apache.seatunnel.spark.SparkEnvironment; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.util.List; + +public class SparkBatchExecution implements Execution<SparkBatchSource, BaseSparkTransform, SparkBatchSink> { + + public static final String SOURCE_TABLE_NAME = "source_table_name"; + public static final String RESULT_TABLE_NAME = "result_table_name"; + + public static void registerTempView(String tableName, Dataset<Row> ds) { Review comment: keep same to change to private modifier ########## File path: seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.spark.batch; + +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.config.ConfigRuntimeException; +import org.apache.seatunnel.env.Execution; +import org.apache.seatunnel.spark.BaseSparkSink; +import org.apache.seatunnel.spark.BaseSparkSource; +import org.apache.seatunnel.spark.BaseSparkTransform; +import org.apache.seatunnel.spark.SparkEnvironment; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.util.List; + +public class SparkBatchExecution implements Execution<SparkBatchSource, BaseSparkTransform, SparkBatchSink> { + + public static final String SOURCE_TABLE_NAME = "source_table_name"; + public static final String RESULT_TABLE_NAME = "result_table_name"; + + public static void registerTempView(String tableName, Dataset<Row> ds) { + ds.createOrReplaceGlobalTempView(tableName); + } + + public static void registerInputTempView(BaseSparkSource<Dataset<Row>> source, SparkEnvironment environment) { + Config config = source.getConfig(); + if (config.hasPath(SparkBatchExecution.RESULT_TABLE_NAME)) { + String tableName = config.getString(SparkBatchExecution.RESULT_TABLE_NAME); + registerTempView(tableName, source.getData(environment)); + } else { + throw new ConfigRuntimeException( + "Plugin[" + source.getClass().getName() + "] must be registered as dataset/table, please set \"result_table_name\" config"); + } + } + + public static Dataset<Row> transformProcess(SparkEnvironment environment, BaseSparkTransform transform, Dataset<Row> ds) { + Dataset<Row> fromDs; + Config config = transform.getConfig(); + if (config.hasPath(SparkBatchExecution.SOURCE_TABLE_NAME)) { + String sourceTableName = config.getString(SparkBatchExecution.SOURCE_TABLE_NAME); + fromDs = environment.getSparkSession().read().table(sourceTableName); + } else { + fromDs = ds; + } + return transform.process(fromDs, environment); + } + + public static void registerTransformTempView(BaseSparkTransform transform, Dataset<Row> ds) { + Config config = transform.getConfig(); + if (config.hasPath(SparkBatchExecution.RESULT_TABLE_NAME)) { + config.getString(SparkBatchExecution.RESULT_TABLE_NAME); + registerTempView(RESULT_TABLE_NAME, ds); Review comment: there is a bug about tableName. ########## File path: seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchSink.java ########## @@ -14,12 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.seatunnel.spark.batch -import org.apache.seatunnel.spark.BaseSparkSink +package org.apache.seatunnel.spark.batch; + +import org.apache.seatunnel.spark.BaseSparkSink; + +import scala.Unit; /** * a SparkBatchSink plugin will write data to other system * using Spark DataSet API. */ -trait SparkBatchSink extends BaseSparkSink[Unit] {} +@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule") +public abstract class SparkBatchSink extends BaseSparkSink<Unit> { Review comment: better to change `<Unit>` to `<Void>` ########## File path: seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.spark.batch; + +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.config.ConfigRuntimeException; +import org.apache.seatunnel.env.Execution; +import org.apache.seatunnel.spark.BaseSparkSink; +import org.apache.seatunnel.spark.BaseSparkSource; +import org.apache.seatunnel.spark.BaseSparkTransform; +import org.apache.seatunnel.spark.SparkEnvironment; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.util.List; + +public class SparkBatchExecution implements Execution<SparkBatchSource, BaseSparkTransform, SparkBatchSink> { + + public static final String SOURCE_TABLE_NAME = "source_table_name"; + public static final String RESULT_TABLE_NAME = "result_table_name"; + + public static void registerTempView(String tableName, Dataset<Row> ds) { + ds.createOrReplaceGlobalTempView(tableName); + } + + public static void registerInputTempView(BaseSparkSource<Dataset<Row>> source, SparkEnvironment environment) { + Config config = source.getConfig(); + if (config.hasPath(SparkBatchExecution.RESULT_TABLE_NAME)) { + String tableName = config.getString(SparkBatchExecution.RESULT_TABLE_NAME); + registerTempView(tableName, source.getData(environment)); + } else { + throw new ConfigRuntimeException( + "Plugin[" + source.getClass().getName() + "] must be registered as dataset/table, please set \"result_table_name\" config"); + } + } + + public static Dataset<Row> transformProcess(SparkEnvironment environment, BaseSparkTransform transform, Dataset<Row> ds) { + Dataset<Row> fromDs; + Config config = transform.getConfig(); + if (config.hasPath(SparkBatchExecution.SOURCE_TABLE_NAME)) { + String sourceTableName = config.getString(SparkBatchExecution.SOURCE_TABLE_NAME); + fromDs = environment.getSparkSession().read().table(sourceTableName); + } else { + fromDs = ds; + } + return transform.process(fromDs, environment); + } + + public static void registerTransformTempView(BaseSparkTransform transform, Dataset<Row> ds) { + Config config = transform.getConfig(); + if (config.hasPath(SparkBatchExecution.RESULT_TABLE_NAME)) { + config.getString(SparkBatchExecution.RESULT_TABLE_NAME); + registerTempView(RESULT_TABLE_NAME, ds); + } + } + + public static void sinkProcess(SparkEnvironment environment, BaseSparkSink<?> sink, Dataset<Row> ds) { + Dataset<Row> fromDs; + Config config = sink.getConfig(); + if (config.hasPath(SparkBatchExecution.SOURCE_TABLE_NAME)) { + String sourceTableName = config.getString(SparkBatchExecution.RESULT_TABLE_NAME); + fromDs = environment.getSparkSession().read().table(sourceTableName); + } else { + fromDs = ds; + } + sink.output(fromDs, environment); + } + + private final SparkEnvironment environment; + + private Config config = ConfigFactory.empty(); + + public SparkBatchExecution(SparkEnvironment environment) { + this.environment = environment; + } Review comment: better to moved to the top of the class. ########## File path: seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.spark; + +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.env.RuntimeEnv; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.streaming.Seconds; +import org.apache.spark.streaming.StreamingContext; + +public class SparkEnvironment implements RuntimeEnv { + + private final long sparkStreamingDuration = 5; + + private SparkSession sparkSession; + + private StreamingContext streamingContext; + + private Config config; Review comment: different with origin scala version, which initialized with `ConfigFactory.empty()` ########## File path: seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.spark.batch; + +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.config.ConfigRuntimeException; +import org.apache.seatunnel.env.Execution; +import org.apache.seatunnel.spark.BaseSparkSink; +import org.apache.seatunnel.spark.BaseSparkSource; +import org.apache.seatunnel.spark.BaseSparkTransform; +import org.apache.seatunnel.spark.SparkEnvironment; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.util.List; + +public class SparkBatchExecution implements Execution<SparkBatchSource, BaseSparkTransform, SparkBatchSink> { + + public static final String SOURCE_TABLE_NAME = "source_table_name"; + public static final String RESULT_TABLE_NAME = "result_table_name"; Review comment: keep same to change to private modifier, or use Plugin.RESULT_TABLE_NAME. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
