This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch WAYANG-34
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit b6d9ba516e4e30cbe1644552d937e06ec23f0790
Author: Bertty Contreras-Rojas <[email protected]>
AuthorDate: Fri Sep 24 23:39:27 2021 +0200

    [WAYANG-34] add the ObjectFileSink to the basic
    
    Signed-off-by: bertty <[email protected]>
---
 .../scala/org/apache/wayang/api/DataQuanta.scala   | 28 ++++++++++
 .../wayang/basic/operators/ObjectFileSink.java     | 55 +++++++++++++++++++
 .../org/apache/wayang/java/mapping/Mappings.java   |  1 +
 .../wayang/java/mapping/ObjectFileSinkMapping.java | 61 ++++++++++++++++++++++
 .../wayang/java/operators/JavaObjectFileSink.java  | 27 +++++++---
 5 files changed, 164 insertions(+), 8 deletions(-)

diff --git 
a/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/DataQuanta.scala
 
b/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/DataQuanta.scala
index 019d7a2..df03e6e 100644
--- 
a/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/DataQuanta.scala
+++ 
b/wayang-api/wayang-api-scala-java/code/main/scala/org/apache/wayang/api/DataQuanta.scala
@@ -799,6 +799,34 @@ class DataQuanta[Out: ClassTag](val operator: 
ElementaryOperator, outputIndex: I
   }
 
   /**
+   * Write the data quanta in this instance to a Object file. Triggers 
execution.
+   *
+   * @param url          URL to the text file
+   */
+  def writeObjectFile(url: String)(implicit classTag: ClassTag[Out]): Unit = {
+    writeObjectFileJava(url, classTag)
+  }
+
+  /**
+   * Write the data quanta in this instance to a Object file. Triggers 
execution.
+   *
+   * @param url          URL to the text file
+   */
+  def writeObjectFileJava(url: String, classTag: ClassTag[Out]): Unit ={
+    val sink = new ObjectFileSink[Out](
+      url,
+      basicDataUnitType(classTag).getTypeClass
+    )
+    sink.setName(s"Write objects to $url")
+    this.connectTo(sink, 0)
+
+    // Do the execution.
+    this.planBuilder.sinks += sink
+    this.planBuilder.buildAndExecute()
+    this.planBuilder.sinks.clear()
+  }
+
+  /**
     * Restrict the producing [[Operator]] to run on certain [[Platform]]s.
     *
     * @param platforms on that the [[Operator]] may be executed
diff --git 
a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSink.java
 
b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSink.java
new file mode 100644
index 0000000..600eedc
--- /dev/null
+++ 
b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ObjectFileSink.java
@@ -0,0 +1,55 @@
+package org.apache.wayang.basic.operators;
+
+import java.util.Objects;
+import org.apache.wayang.core.function.TransformationDescriptor;
+import org.apache.wayang.core.optimizer.costs.DefaultLoadEstimator;
+import org.apache.wayang.core.optimizer.costs.NestableLoadProfileEstimator;
+import org.apache.wayang.core.plan.wayangplan.UnarySink;
+import org.apache.wayang.core.types.DataSetType;
+
+/**
+ * This {@link UnarySink} writes all incoming data quanta to a Object file.
+ *
+ * @param <T> type of the object to store
+ */
+public class ObjectFileSink<T> extends UnarySink<T> {
+
+  protected final String textFileUrl;
+
+  protected final Class<T> tClass;
+
+  /**
+   * Creates a new instance.
+   *
+   * @param targetPath  URL to file that should be written
+   * @param type        {@link DataSetType} of the incoming data quanta
+   */
+  public ObjectFileSink(String targetPath, DataSetType<T> type) {
+    super(type);
+    this.textFileUrl = targetPath;
+    this.tClass = type.getDataUnitType().getTypeClass();
+  }
+
+  /**
+   * Creates a new instance.
+   *
+   * @param textFileUrl        URL to file that should be written
+   * @param typeClass          {@link Class} of incoming data quanta
+   */
+  public ObjectFileSink(String textFileUrl, Class<T> typeClass) {
+    super(DataSetType.createDefault(typeClass));
+    this.textFileUrl = textFileUrl;
+    this.tClass = typeClass;
+  }
+
+  /**
+   * Creates a copied instance.
+   *
+   * @param that should be copied
+   */
+  public ObjectFileSink(ObjectFileSink<T> that) {
+    super(that);
+    this.textFileUrl = that.textFileUrl;
+    this.tClass = that.tClass;
+  }
+}
diff --git 
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
 
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
index ee9cebc..b2809d6 100644
--- 
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
+++ 
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
@@ -32,6 +32,7 @@ public class Mappings {
     public static Collection<Mapping> BASIC_MAPPINGS = Arrays.asList(
             new TextFileSourceMapping(),
             new TextFileSinkMapping(),
+            new ObjectFileSinkMapping(),
             new MapMapping(),
             new MapPartitionsMapping(),
             new ReduceByMapping(),
diff --git 
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSinkMapping.java
 
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSinkMapping.java
new file mode 100644
index 0000000..f2bdf5e
--- /dev/null
+++ 
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ObjectFileSinkMapping.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.java.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.wayang.basic.operators.ObjectFileSink;
+import org.apache.wayang.basic.operators.TextFileSink;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.java.operators.JavaObjectFileSink;
+import org.apache.wayang.java.operators.JavaTextFileSink;
+import org.apache.wayang.java.platform.JavaPlatform;
+
+/**
+ * Mapping from {@link ObjectFileSink} to {@link JavaObjectFileSink}.
+ */
+public class ObjectFileSinkMapping implements Mapping {
+
+    @Override
+    public Collection<PlanTransformation> getTransformations() {
+        return Collections.singleton(new PlanTransformation(
+                this.createSubplanPattern(),
+                this.createReplacementSubplanFactory(),
+                JavaPlatform.getInstance()
+        ));
+    }
+
+    private SubplanPattern createSubplanPattern() {
+        final OperatorPattern operatorPattern = new OperatorPattern<>(
+                "sink", new ObjectFileSink<>(null, 
DataSetType.none().getDataUnitType().getTypeClass()), false
+        );
+        return SubplanPattern.createSingleton(operatorPattern);
+    }
+
+    private ReplacementSubplanFactory createReplacementSubplanFactory() {
+        return new 
ReplacementSubplanFactory.OfSingleOperators<ObjectFileSink<?>>(
+                (matchedOperator, epoch) -> new 
JavaObjectFileSink<>(matchedOperator).at(epoch)
+        );
+    }
+}
diff --git 
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java
 
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java
index ff45db6..060efcf 100644
--- 
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java
+++ 
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaObjectFileSink.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ObjectFileSink;
+import org.apache.wayang.basic.operators.TextFileSink;
 import org.apache.wayang.core.api.exception.WayangException;
 import org.apache.wayang.core.optimizer.OptimizationContext;
 import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
@@ -57,17 +59,18 @@ import java.util.stream.Stream;
  *
  * @see JavaObjectFileSource
  */
-public class JavaObjectFileSink<T> extends UnarySink<T> implements 
JavaExecutionOperator {
+public class JavaObjectFileSink<T> extends ObjectFileSink<T> implements 
JavaExecutionOperator {
 
-    private final String targetPath;
+    public JavaObjectFileSink(ObjectFileSink<T> that) {
+        super(that);
+    }
 
     public JavaObjectFileSink(DataSetType<T> type) {
         this(null, type);
     }
 
     public JavaObjectFileSink(String targetPath, DataSetType<T> type) {
-        super(type);
-        this.targetPath = targetPath;
+        super(targetPath, type);
     }
 
     @Override
@@ -79,9 +82,14 @@ public class JavaObjectFileSink<T> extends UnarySink<T> 
implements JavaExecution
         assert inputs.length == this.getNumInputs();
 
         // Prepare Hadoop's SequenceFile.Writer.
-        FileChannel.Instance output = (FileChannel.Instance) outputs[0];
-        final String path = output.addGivenOrTempPath(this.targetPath, 
javaExecutor.getCompiler().getConfiguration());
-
+        final String path;
+        if(outputs.length == 1) {
+            FileChannel.Instance output = (FileChannel.Instance) outputs[0];
+            path = output.addGivenOrTempPath(this.textFileUrl,
+                javaExecutor.getCompiler().getConfiguration());
+        }else{
+            path = this.textFileUrl;
+        }
         final SequenceFile.Writer.Option fileOption = 
SequenceFile.Writer.file(new Path(path));
         final SequenceFile.Writer.Option keyClassOption = 
SequenceFile.Writer.keyClass(NullWritable.class);
         final SequenceFile.Writer.Option valueClassOption = 
SequenceFile.Writer.valueClass(BytesWritable.class);
@@ -90,6 +98,9 @@ public class JavaObjectFileSink<T> extends UnarySink<T> 
implements JavaExecution
             // Chunk the stream of data quanta and write the chunks into the 
sequence file.
             StreamChunker streamChunker = new StreamChunker(10, (chunk, size) 
-> {
                 if (chunk.length != size) {
+                    System.out.println("heer");
+                    System.out.println(chunk.length);
+                    System.out.println(size);
                     chunk = Arrays.copyOfRange(chunk, 0, size);
                 }
                 try {
@@ -119,7 +130,7 @@ public class JavaObjectFileSink<T> extends UnarySink<T> 
implements JavaExecution
 
     @Override
     protected ExecutionOperator createCopy() {
-        return new JavaObjectFileSink<>(this.targetPath, this.getType());
+        return new JavaObjectFileSink<>(this.textFileUrl, this.getType());
     }
 
     @Override

Reply via email to