This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git

commit ddbf26f00f5fa04774792294ccab7599667f1b77
Author: Chandni Singh <csi...@apache.org>
AuthorDate: Wed Mar 2 18:27:15 2016 -0800

    Atomic file output app
---
 .../com/example/myapexapp/AtomicFileOutputApp.java | 101 +++++++++++++++++++++
 .../example/myapexapp/AtomicFileOutputAppTest.java |  93 +++++++++++++++++++
 2 files changed, 194 insertions(+)

diff --git 
a/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
 
b/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
new file mode 100644
index 0000000..55f8cc6
--- /dev/null
+++ 
b/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
@@ -0,0 +1,101 @@
+/**
+ * Copyright (c) 2015 DataTorrent, Inc.
+ * All rights reserved.
+ */
+package com.example.myapexapp;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+@ApplicationAnnotation(name = "AtomicFileOutput")
+public class AtomicFileOutputApp implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+    KafkaSinglePortStringInputOperator kafkaInput = 
dag.addOperator("kafkaInput",
+        new KafkaSinglePortStringInputOperator());
+    kafkaInput.setIdempotentStorageManager(new 
IdempotentStorageManager.FSIdempotentStorageManager());
+
+    Application.UniqueCounterFlat count = dag.addOperator("count", new 
Application.UniqueCounterFlat());
+
+    FileWriter fileWriter = dag.addOperator("fileWriter", new FileWriter());
+
+    ConsoleOutputOperator cons = dag.addOperator("console", new 
ConsoleOutputOperator());
+    dag.addStream("words", kafkaInput.outputPort, count.data);
+    dag.addStream("counts", count.counts, fileWriter.input, cons.input);
+  }
+
+  /**
+   * This implementation of {@link AbstractFileOutputOperator} writes to a 
single file. However when it doesn't
+   * receive any tuples in an application window then it finalizes the file, 
i.e., the file is completed and will not
+   * be opened again.
+   * <p/>
+   * If more tuples are received after a hiatus then they will be written to a 
part file -
+   * {@link #FILE_NAME_PREFIX}.{@link #part}
+   */
+  public static class FileWriter extends 
AbstractFileOutputOperator<KeyValPair<String, Integer>>
+  {
+    static final String FILE_NAME_PREFIX = "filestore";
+
+    private int part;
+    private transient String currentFileName;
+
+    private transient boolean receivedTuples;
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      currentFileName = (part == 0) ? FILE_NAME_PREFIX : FILE_NAME_PREFIX + 
"." + part;
+      super.setup(context);
+    }
+
+    @Override
+    protected String getFileName(KeyValPair<String, Integer> keyValPair)
+    {
+      return currentFileName;
+    }
+
+    @Override
+    protected byte[] getBytesForTuple(KeyValPair<String, Integer> keyValPair)
+    {
+      return (keyValPair.toString() + "\n").getBytes();
+    }
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      super.beginWindow(windowId);
+      receivedTuples = false;
+    }
+
+    @Override
+    protected void processTuple(KeyValPair<String, Integer> tuple)
+    {
+      super.processTuple(tuple);
+      receivedTuples = true;
+    }
+
+    @Override
+    public void endWindow()
+    {
+      super.endWindow();
+      //request for finalization if there is no input. This is done 
automatically if the file is rotated periodically
+      // or has a size threshold.
+      if (!receivedTuples && !endOffsets.isEmpty()) {
+        requestFinalize(currentFileName);
+        part++;
+        currentFileName = FILE_NAME_PREFIX + "." + part;
+      }
+    }
+  }
+}
diff --git 
a/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
 
b/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
new file mode 100644
index 0000000..b539394
--- /dev/null
+++ 
b/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
@@ -0,0 +1,93 @@
+package com.example.myapexapp;
+
+import java.io.File;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.contrib.kafka.KafkaOperatorTestBase;
+import com.datatorrent.contrib.kafka.KafkaTestProducer;
+
+/**
+ * Copyright (c) 2015 DataTorrent, Inc.
+ * All rights reserved.
+ */
+public class AtomicFileOutputAppTest
+{
+  private final KafkaOperatorTestBase kafkaLauncher = new 
KafkaOperatorTestBase();
+  private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationTest.class);
+  private static final String KAFKA_TOPIC = "exactly-once-test";
+  private static final String TARGET_DIR = "target/atomicFileOutput";
+
+  @Before
+  public void beforeTest() throws Exception {
+    kafkaLauncher.baseDir = "target/" + this.getClass().getName();
+    FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
+    kafkaLauncher.startZookeeper();
+    kafkaLauncher.startKafkaServer();
+    kafkaLauncher.createTopic(0, KAFKA_TOPIC);
+  }
+
+  @After
+  public void afterTest() {
+    kafkaLauncher.stopKafkaServer();
+    kafkaLauncher.stopZookeeper();
+  }
+
+  @Test
+  public void testApplication() throws Exception {
+    try {
+
+      File targetDir = new File(TARGET_DIR);
+      FileUtils.deleteDirectory(targetDir);
+      FileUtils.forceMkdir(targetDir);
+
+      // produce some test data
+      KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC);
+      String[] words = "count the words from kafka and store them in the 
db".split("\\s+");
+      p.setMessages(Lists.newArrayList(words));
+      new Thread(p).start();
+
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      
conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+      conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC);
+      conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + 
KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
+      conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // 
consume one word per window
+      conf.set("dt.operator.fileWriter.prop.filePath", TARGET_DIR);
+
+      lma.prepareDAG(new AtomicFileOutputApp(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync(); // test will terminate after results are available
+
+      long timeout = System.currentTimeMillis() + 60000; // 60s timeout
+
+      File outputFile = new File(TARGET_DIR, 
AtomicFileOutputApp.FileWriter.FILE_NAME_PREFIX);
+      while (!outputFile.exists() && timeout > System.currentTimeMillis()) {
+        Thread.sleep(1000);
+        LOG.debug("Waiting for {}", outputFile);
+      }
+
+      Assert.assertTrue("output file exists " + 
AtomicFileOutputApp.FileWriter.FILE_NAME_PREFIX, outputFile.exists() &&
+          outputFile.isFile());
+
+      lc.shutdown();
+
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@apex.apache.org" <commits@apex.apache.org>.

Reply via email to