This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch WAYANG-34 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit d0296266715d7201ab56146fd0bae70b98599a0a Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Mon Sep 27 12:47:30 2021 +0200 [WAYANG-34] add the ObjectFileSink to Platform Spark and Flink Signed-off-by: bertty <[email protected]> --- .../org/apache/wayang/flink/mapping/Mappings.java | 1 + .../flink/mapping/ObjectFileSinkMapping.java | 64 +++++++++++++++++++++ .../flink/operators/FlinkObjectFileSink.java | 27 +++++---- .../wayang/java/operators/JavaObjectFileSink.java | 3 +- .../org/apache/wayang/spark/mapping/Mappings.java | 1 + .../spark/mapping/ObjectFileSinkMapping.java | 66 ++++++++++++++++++++++ .../spark/operators/SparkObjectFileSink.java | 22 +++++--- 7 files changed, 166 insertions(+), 18 deletions(-) diff --git a/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/mapping/Mappings.java b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/mapping/Mappings.java index 2406544..3a89d6c 100644 --- a/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/mapping/Mappings.java +++ b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/mapping/Mappings.java @@ -53,6 +53,7 @@ public class Mappings { new SampleMapping(), new SortMapping(), new TextFileSinkMapping(), + new ObjectFileSinkMapping(), new TextFileSourceMapping(), new UnionAllMapping(), new ZipWithIdMapping() diff --git a/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/mapping/ObjectFileSinkMapping.java b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/mapping/ObjectFileSinkMapping.java new file mode 100644 index 0000000..1008119 --- /dev/null +++ b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/mapping/ObjectFileSinkMapping.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.flink.mapping; + +import java.util.Collection; +import java.util.Collections; +import org.apache.wayang.basic.operators.ObjectFileSink; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.flink.operators.FlinkObjectFileSink; +import org.apache.wayang.flink.platform.FlinkPlatform; + +/** + * Mapping from {@link ObjectFileSink} to {@link FlinkObjectFileSink}. + */ +public class ObjectFileSinkMapping implements Mapping { + + @Override + public Collection<PlanTransformation> getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + FlinkPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern operatorPattern = new OperatorPattern<>( + "sink", + new ObjectFileSink<>( + null, + DataSetType.none().getDataUnitType().getTypeClass() + ), + false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSink<?>>( + (matchedOperator, epoch) -> new FlinkObjectFileSink<>(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java index 9130a97..6e29ed7 100644 --- a/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java +++ b/wayang-platforms/wayang-flink/code/main/java/org/apache/wayang/flink/operators/FlinkObjectFileSink.java @@ -21,6 +21,7 @@ package org.apache.wayang.flink.operators; import org.apache.flink.api.java.operators.DataSink; import org.apache.flink.core.fs.FileSystem; import org.apache.wayang.basic.channels.FileChannel; +import org.apache.wayang.basic.operators.ObjectFileSink; import org.apache.wayang.core.optimizer.OptimizationContext; import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; import org.apache.wayang.core.plan.wayangplan.Operator; @@ -44,18 +45,18 @@ import java.util.List; * * @see FlinkObjectFileSink */ -public class FlinkObjectFileSink<Type> extends UnarySink<Type> implements FlinkExecutionOperator { - - private final String targetPath; +public class FlinkObjectFileSink<Type> extends ObjectFileSink<Type> implements FlinkExecutionOperator { + public FlinkObjectFileSink(ObjectFileSink<Type> that) { + super(that); + } public FlinkObjectFileSink(DataSetType<Type> type) { this(null, type); } public FlinkObjectFileSink(String targetPath, DataSetType<Type> type) { - super(type); - this.targetPath = targetPath; + super(targetPath, type); } @Override @@ -68,10 +69,16 @@ public class FlinkObjectFileSink<Type> extends UnarySink<Type> implements FlinkE assert inputs.length == this.getNumInputs(); assert outputs.length <= 1; - - final FileChannel.Instance output = (FileChannel.Instance) outputs[0]; - final String targetPath = output.addGivenOrTempPath(this.targetPath, flinkExecutor.getConfiguration()); - + final FileChannel.Instance output; + final String targetPath; + if(outputs.length == 1) { + output = (FileChannel.Instance) outputs[0]; + targetPath = output.addGivenOrTempPath(this.textFileUrl, flinkExecutor.getConfiguration()); + }else{ + targetPath = this.textFileUrl; + } + + //TODO: remove the set parallelism 1 DataSetChannel.Instance input = (DataSetChannel.Instance) inputs[0]; final DataSink<Type> tDataSink = input.<Type>provideDataSet() .write(new WayangFileOutputFormat<Type>(targetPath), targetPath, FileSystem.WriteMode.OVERWRITE) @@ -83,7 +90,7 @@ public class FlinkObjectFileSink<Type> extends UnarySink<Type> implements FlinkE @Override protected ExecutionOperator createCopy() { - return new FlinkObjectFileSink<>(targetPath, this.getType()); + return new FlinkObjectFileSink<>(this.textFileUrl, this.getType()); } @Override diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java index 060efcf..8fbd470 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java @@ -83,8 +83,9 @@ public class JavaObjectFileSink<T> extends ObjectFileSink<T> implements JavaExec // Prepare Hadoop's SequenceFile.Writer. final String path; + FileChannel.Instance output; if(outputs.length == 1) { - FileChannel.Instance output = (FileChannel.Instance) outputs[0]; + output = (FileChannel.Instance) outputs[0]; path = output.addGivenOrTempPath(this.textFileUrl, javaExecutor.getCompiler().getConfiguration()); }else{ diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java index 70ec802..5618e78 100644 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java +++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java @@ -32,6 +32,7 @@ public class Mappings { public static Collection<Mapping> BASIC_MAPPINGS = Arrays.asList( new TextFileSourceMapping(), new TextFileSinkMapping(), + new ObjectFileSinkMapping(), new MapMapping(), new MapPartitionsMapping(), new ReduceByMapping(), diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/ObjectFileSinkMapping.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/ObjectFileSinkMapping.java new file mode 100644 index 0000000..bd6bed6 --- /dev/null +++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/ObjectFileSinkMapping.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.spark.mapping; + +import java.util.Collection; +import java.util.Collections; +import org.apache.wayang.basic.operators.ObjectFileSink; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.java.operators.JavaObjectFileSink; +import org.apache.wayang.java.platform.JavaPlatform; +import org.apache.wayang.spark.operators.SparkObjectFileSink; +import org.apache.wayang.spark.platform.SparkPlatform; + +/** + * Mapping from {@link ObjectFileSink} to {@link SparkObjectFileSink}. + */ +public class ObjectFileSinkMapping implements Mapping { + + @Override + public Collection<PlanTransformation> getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + SparkPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern operatorPattern = new OperatorPattern<>( + "sink", + new ObjectFileSink<>( + null, + DataSetType.none().getDataUnitType().getTypeClass() + ), + false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSink<?>>( + (matchedOperator, epoch) -> new SparkObjectFileSink<>(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java index 7115500..1afc9b6 100644 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java +++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkObjectFileSink.java @@ -19,6 +19,7 @@ package org.apache.wayang.spark.operators; import org.apache.wayang.basic.channels.FileChannel; +import org.apache.wayang.basic.operators.ObjectFileSink; import org.apache.wayang.core.optimizer.OptimizationContext; import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; import org.apache.wayang.core.plan.wayangplan.Operator; @@ -42,17 +43,18 @@ import java.util.List; * * @see SparkObjectFileSource */ -public class SparkObjectFileSink<T> extends UnarySink<T> implements SparkExecutionOperator { +public class SparkObjectFileSink<T> extends ObjectFileSink<T> implements SparkExecutionOperator { - private final String targetPath; + public SparkObjectFileSink(ObjectFileSink<T> that) { + super(that); + } public SparkObjectFileSink(DataSetType<T> type) { this(null, type); } public SparkObjectFileSink(String targetPath, DataSetType<T> type) { - super(type); - this.targetPath = targetPath; + super(targetPath, type); } @Override @@ -64,8 +66,14 @@ public class SparkObjectFileSink<T> extends UnarySink<T> implements SparkExecuti assert inputs.length == this.getNumInputs(); assert outputs.length <= 1; - final FileChannel.Instance output = (FileChannel.Instance) outputs[0]; - final String targetPath = output.addGivenOrTempPath(this.targetPath, sparkExecutor.getConfiguration()); + final String targetPath; + if(outputs.length > 0) { + final FileChannel.Instance output = (FileChannel.Instance) outputs[0]; + targetPath = output.addGivenOrTempPath(this.textFileUrl, sparkExecutor.getConfiguration()); + }else{ + targetPath = this.textFileUrl; + } + RddChannel.Instance input = (RddChannel.Instance) inputs[0]; input.provideRdd() @@ -78,7 +86,7 @@ public class SparkObjectFileSink<T> extends UnarySink<T> implements SparkExecuti @Override protected ExecutionOperator createCopy() { - return new SparkObjectFileSink<>(targetPath, this.getType()); + return new SparkObjectFileSink<>(this.textFileUrl, this.getType()); } @Override
