This is an automated email from the ASF dual-hosted git repository. gaojun2048 pushed a commit to branch revert-2854-feature/transform-spark-cache in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
commit 39b3497124adf28e53b983115edfdac5f2a7f45a Author: Eric <[email protected]> AuthorDate: Mon Nov 21 21:18:41 2022 +0800 Revert "[Feature] [Plugin] add spark cache transform (#2854)" This reverts commit 4d430942f45a2a5a089b3a04645f0ba1b1cbf310. --- docs/en/transform/cache.md | 56 ---------------- seatunnel-core/seatunnel-spark-starter/pom.xml | 7 -- .../fake/fakesource_cache_to_console.conf | 74 ---------------------- .../seatunnel-transforms-spark/pom.xml | 1 - .../seatunnel-transform-spark-cache/pom.xml | 32 ---------- .../org.apache.seatunnel.spark.BaseSparkTransform | 18 ------ .../apache/seatunnel/spark/transform/Cache.scala | 42 ------------ 7 files changed, 230 deletions(-) diff --git a/docs/en/transform/cache.md b/docs/en/transform/cache.md deleted file mode 100644 index 4b3306477..000000000 --- a/docs/en/transform/cache.md +++ /dev/null @@ -1,56 +0,0 @@ -# cache - -> cache transform plugin - -## Description - -Supports using Cache in data integration by the transform. - -:::tip - -This transform **ONLY** supported by Spark. - -::: - -## Options - -| name | type | required | default value | -| -------------- | ----------- | -------- | ------------- | -| storage_level | string | false | MEMORY_ONLY | - - -### storage_level [string] - -One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use. - -| Storage Level | Meaning | -| -------------- | ------------- | -| MEMORY_ONLY | Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.| -| MEMORY_AND_DISK | Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.| -| MEMORY_ONLY_SER (Java and Scala) | Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.| -| MEMORY_AND_DISK_SER (Java and Scala) | Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.| -| DISK_ONLY | Store the RDD partitions only on disk.| -| MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | Same as the levels above, but replicate each partition on two cluster nodes.| -| OFF_HEAP (experimental) | Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.| - -For more details, please refer to [https://spark.apache.org/docs/2.4.8/rdd-programming-guide.html#rdd-persistence] - - -### common options [string] - -Transform plugin common parameters, please refer to [Transform Plugin](common-options.mdx) for details - -## Examples - -```bash - - cache { - result_table_name="temp_cache" - } - - cache { - storage_level = "MEMORY_ONLY" - result_table_name="temp_cache" - } - -``` diff --git a/seatunnel-core/seatunnel-spark-starter/pom.xml b/seatunnel-core/seatunnel-spark-starter/pom.xml index 75f4ae187..7ff006fac 100644 --- a/seatunnel-core/seatunnel-spark-starter/pom.xml +++ b/seatunnel-core/seatunnel-spark-starter/pom.xml @@ -101,13 +101,6 @@ <artifactId>seatunnel-transform-spark-uuid</artifactId> <version>${project.version}</version> </dependency> - - <dependency> - <groupId>org.apache.seatunnel</groupId> - <artifactId>seatunnel-transform-spark-cache</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> <build> diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-fake-e2e/src/test/resources/fake/fakesource_cache_to_console.conf b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-fake-e2e/src/test/resources/fake/fakesource_cache_to_console.conf deleted file mode 100644 index 38e7b92e3..000000000 --- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-fake-e2e/src/test/resources/fake/fakesource_cache_to_console.conf +++ /dev/null @@ -1,74 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -###### -###### This config file is a demonstration of streaming processing in seatunnel config -###### - -env { - # You can set spark configuration here - # see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties - spark.app.name = "SeaTunnel" - spark.executor.instances = 2 - spark.executor.cores = 1 - spark.executor.memory = "1g" - spark.master = local -} - -source { - # This is a example input plugin **only for test and demonstrate the feature input plugin** - Fake { - result_table_name = "my_dataset" - } - - # You can also use other input plugins, such as hdfs - # hdfs { - # result_table_name = "accesslog" - # path = "hdfs://hadoop-cluster-01/nginx/accesslog" - # format = "json" - # } - - # If you would like to get more information about how to configure seatunnel and see full list of input plugins, - # please go to https://seatunnel.apache.org/docs/spark/configuration/source-plugins/Fake -} - -transform { - # split data by specific delimiter - - # you can also use other transform plugins, such as sql - # sql { - # sql = "select * from accesslog where request_time > 1000" - # } - cache { - storage_level = "MEMORY_ONLY" - } - # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Split -} - -sink { - # choose stdout output plugin to output data to console - Console {} - - # you can also you other output plugins, such as sql - # hdfs { - # path = "hdfs://hadoop-cluster-01/nginx/accesslog_processed" - # save_mode = "append" - # } - - # If you would like to get more information about how to configure seatunnel and see full list of output plugins, - # please go to https://seatunnel.apache.org/docs/spark/configuration/sink-plugins/Console -} \ No newline at end of file diff --git a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml index b7f68ed6f..daa8cd9c0 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml +++ b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml @@ -31,7 +31,6 @@ <packaging>pom</packaging> <modules> - <module>seatunnel-transform-spark-cache</module> <module>seatunnel-transform-spark-json</module> <module>seatunnel-transform-spark-split</module> <module>seatunnel-transform-spark-replace</module> diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-cache/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-cache/pom.xml deleted file mode 100644 index da2e2c148..000000000 --- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-cache/pom.xml +++ /dev/null @@ -1,32 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <groupId>org.apache.seatunnel</groupId> - <artifactId>seatunnel-transforms-spark</artifactId> - <version>${revision}</version> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>seatunnel-transform-spark-cache</artifactId> - -</project> diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-cache/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-cache/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform deleted file mode 100644 index 199c8b163..000000000 --- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-cache/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform +++ /dev/null @@ -1,18 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -org.apache.seatunnel.spark.transform.Cache diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-cache/src/main/scala/org/apache/seatunnel/spark/transform/Cache.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-cache/src/main/scala/org/apache/seatunnel/spark/transform/Cache.scala deleted file mode 100644 index e7afd7f74..000000000 --- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-cache/src/main/scala/org/apache/seatunnel/spark/transform/Cache.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.spark.transform - -import org.apache.seatunnel.common.config.CheckResult -import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment} -import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.storage.StorageLevel - -class Cache extends BaseSparkTransform { - - override def process(data: Dataset[Row], env: SparkEnvironment): Dataset[Row] = { - if (config.hasPath("storage_level")) { - val storageLevel = config.getString("storage_level") - data.persist(StorageLevel.fromString(storageLevel)) - } else { - data.cache() - } - } - - override def checkConfig(): CheckResult = { - CheckResult.success() - } - - override def getPluginName: String = "cache" - -}
