This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch WAYANG-34 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit b6d9ba516e4e30cbe1644552d937e06ec23f0790 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Fri Sep 24 23:39:27 2021 +0200 [WAYANG-34] add the ObjectFileSink to the basic Signed-off-by: bertty <[email protected]> --- .../scala/org/apache/wayang/api/DataQuanta.scala | 28 ++++++++++ .../wayang/basic/operators/ObjectFileSink.java | 55 +++++++++++++++++++ .../org/apache/wayang/java/mapping/Mappings.java | 1 + .../wayang/java/mapping/ObjectFileSinkMapping.java | 61 ++++++++++++++++++++++ .../wayang/java/operators/JavaObjectFileSink.java | 27 +++++++--- 5 files changed, 164 insertions(+), 8 deletions(-) diff --git a/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/DataQuanta.scala b/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/DataQuanta.scala index 019d7a2..df03e6e 100644 --- a/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/DataQuanta.scala +++ b/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/DataQuanta.scala @@ -799,6 +799,34 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I } /** + * Write the data quanta in this instance to a Object file. Triggers execution. + * + * @param url URL to the text file + */ + def writeObjectFile(url: String)(implicit classTag: ClassTag[Out]): Unit = { + writeObjectFileJava(url, classTag) + } + + /** + * Write the data quanta in this instance to a Object file. Triggers execution. + * + * @param url URL to the text file + */ + def writeObjectFileJava(url: String, classTag: ClassTag[Out]): Unit ={ + val sink = new ObjectFileSink[Out]( + url, + basicDataUnitType(classTag).getTypeClass + ) + sink.setName(s"Write objects to $url") + this.connectTo(sink, 0) + + // Do the execution. + this.planBuilder.sinks += sink + this.planBuilder.buildAndExecute() + this.planBuilder.sinks.clear() + } + + /** * Restrict the producing [[Operator]] to run on certain [[Platform]]s. * * @param platforms on that the [[Operator]] may be executed diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSink.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSink.java new file mode 100644 index 0000000..600eedc --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSink.java @@ -0,0 +1,55 @@ +package org.apache.wayang.basic.operators; + +import java.util.Objects; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.core.optimizer.costs.DefaultLoadEstimator; +import org.apache.wayang.core.optimizer.costs.NestableLoadProfileEstimator; +import org.apache.wayang.core.plan.wayangplan.UnarySink; +import org.apache.wayang.core.types.DataSetType; + +/** + * This {@link UnarySink} writes all incoming data quanta to a Object file. + * + * @param <T> type of the object to store + */ +public class ObjectFileSink<T> extends UnarySink<T> { + + protected final String textFileUrl; + + protected final Class<T> tClass; + + /** + * Creates a new instance. + * + * @param targetPath URL to file that should be written + * @param type {@link DataSetType} of the incoming data quanta + */ + public ObjectFileSink(String targetPath, DataSetType<T> type) { + super(type); + this.textFileUrl = targetPath; + this.tClass = type.getDataUnitType().getTypeClass(); + } + + /** + * Creates a new instance. + * + * @param textFileUrl URL to file that should be written + * @param typeClass {@link Class} of incoming data quanta + */ + public ObjectFileSink(String textFileUrl, Class<T> typeClass) { + super(DataSetType.createDefault(typeClass)); + this.textFileUrl = textFileUrl; + this.tClass = typeClass; + } + + /** + * Creates a copied instance. + * + * @param that should be copied + */ + public ObjectFileSink(ObjectFileSink<T> that) { + super(that); + this.textFileUrl = that.textFileUrl; + this.tClass = that.tClass; + } +} diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java index ee9cebc..b2809d6 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java @@ -32,6 +32,7 @@ public class Mappings { public static Collection<Mapping> BASIC_MAPPINGS = Arrays.asList( new TextFileSourceMapping(), new TextFileSinkMapping(), + new ObjectFileSinkMapping(), new MapMapping(), new MapPartitionsMapping(), new ReduceByMapping(), diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSinkMapping.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSinkMapping.java new file mode 100644 index 0000000..f2bdf5e --- /dev/null +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSinkMapping.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.mapping; + +import java.util.Collection; +import java.util.Collections; +import org.apache.wayang.basic.operators.ObjectFileSink; +import org.apache.wayang.basic.operators.TextFileSink; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.java.operators.JavaObjectFileSink; +import org.apache.wayang.java.operators.JavaTextFileSink; +import org.apache.wayang.java.platform.JavaPlatform; + +/** + * Mapping from {@link ObjectFileSink} to {@link JavaObjectFileSink}. + */ +public class ObjectFileSinkMapping implements Mapping { + + @Override + public Collection<PlanTransformation> getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + JavaPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + final OperatorPattern operatorPattern = new OperatorPattern<>( + "sink", new ObjectFileSink<>(null, DataSetType.none().getDataUnitType().getTypeClass()), false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators<ObjectFileSink<?>>( + (matchedOperator, epoch) -> new JavaObjectFileSink<>(matchedOperator).at(epoch) + ); + } +} diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java index ff45db6..060efcf 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java @@ -25,6 +25,8 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.wayang.basic.channels.FileChannel; +import org.apache.wayang.basic.operators.ObjectFileSink; +import org.apache.wayang.basic.operators.TextFileSink; import org.apache.wayang.core.api.exception.WayangException; import org.apache.wayang.core.optimizer.OptimizationContext; import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; @@ -57,17 +59,18 @@ import java.util.stream.Stream; * * @see JavaObjectFileSource */ -public class JavaObjectFileSink<T> extends UnarySink<T> implements JavaExecutionOperator { +public class JavaObjectFileSink<T> extends ObjectFileSink<T> implements JavaExecutionOperator { - private final String targetPath; + public JavaObjectFileSink(ObjectFileSink<T> that) { + super(that); + } public JavaObjectFileSink(DataSetType<T> type) { this(null, type); } public JavaObjectFileSink(String targetPath, DataSetType<T> type) { - super(type); - this.targetPath = targetPath; + super(targetPath, type); } @Override @@ -79,9 +82,14 @@ public class JavaObjectFileSink<T> extends UnarySink<T> implements JavaExecution assert inputs.length == this.getNumInputs(); // Prepare Hadoop's SequenceFile.Writer. - FileChannel.Instance output = (FileChannel.Instance) outputs[0]; - final String path = output.addGivenOrTempPath(this.targetPath, javaExecutor.getCompiler().getConfiguration()); - + final String path; + if(outputs.length == 1) { + FileChannel.Instance output = (FileChannel.Instance) outputs[0]; + path = output.addGivenOrTempPath(this.textFileUrl, + javaExecutor.getCompiler().getConfiguration()); + }else{ + path = this.textFileUrl; + } final SequenceFile.Writer.Option fileOption = SequenceFile.Writer.file(new Path(path)); final SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(NullWritable.class); final SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(BytesWritable.class); @@ -90,6 +98,9 @@ public class JavaObjectFileSink<T> extends UnarySink<T> implements JavaExecution // Chunk the stream of data quanta and write the chunks into the sequence file. StreamChunker streamChunker = new StreamChunker(10, (chunk, size) -> { if (chunk.length != size) { + System.out.println("heer"); + System.out.println(chunk.length); + System.out.println(size); chunk = Arrays.copyOfRange(chunk, 0, size); } try { @@ -119,7 +130,7 @@ public class JavaObjectFileSink<T> extends UnarySink<T> implements JavaExecution @Override protected ExecutionOperator createCopy() { - return new JavaObjectFileSink<>(this.targetPath, this.getType()); + return new JavaObjectFileSink<>(this.textFileUrl, this.getType()); } @Override
