[
https://issues.apache.org/jira/browse/FLINK-828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14049681#comment-14049681
]
ASF GitHub Bot commented on FLINK-828:
--------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/55#discussion_r14443106
--- Diff:
stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/relational/FilterAndCountIncompleteLines.java
---
@@ -0,0 +1,281 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project
(http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the
License.
+ *
+
**********************************************************************************************************************/
+package eu.stratosphere.example.java.relational;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import eu.stratosphere.api.common.JobExecutionResult;
+import eu.stratosphere.api.common.accumulators.Accumulator;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.FilterFunction;
+import eu.stratosphere.api.java.operators.DataSource;
+import eu.stratosphere.api.java.tuple.Tuple;
+import eu.stratosphere.configuration.Configuration;
+
+/**
+ * This program filters lines from a CSV file with empty fields. In doing
so, it
+ * counts the number of empty fields per column within a CSV file using a
custom
+ * accumulator for vectors. In this context, empty fields are those, that
at
+ * most contain whitespace characters like space and tab.
+ *
+ * <p>
+ * The input file is a plain text CSV file with the semicolon as field
separator
+ * and double quotes as field delimiters and 9 columns. See
+ * {@link #getDataSet(ExecutionEnvironment)} for configuration.
+ *
+ * <p>
+ * Usage: <code>FilterAndCountIncompleteLines <input file path>
<result path></code> <br>
+ *
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>custom accumulators
+ * <li>tuple data types
+ * <li>inline-defined functions
+ * </ul>
+ */
+@SuppressWarnings("serial")
+public class FilterAndCountIncompleteLines {
+
+ //
*************************************************************************
+ // PROGRAM
+ //
*************************************************************************
+
+ private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ final ExecutionEnvironment env = ExecutionEnvironment
+ .getExecutionEnvironment();
+
+ // get the data set
+ DataSet<Tuple> file = getDataSet(env);
+
+ // filter lines with empty fields
+ DataSet<Tuple> filteredLines = file.filter(new
FilterFunction<Tuple>() {
+
+ // create a new accumulator in each filter function
instance
+ // accumulators can be merged later on
+ private VectorAccumulator emptyFieldCounter = new
VectorAccumulator();
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ *
eu.stratosphere.api.common.functions.AbstractFunction#open(eu
+ * .stratosphere.configuration.Configuration)
+ */
+ @Override
+ public void open(Configuration parameters) throws
Exception {
+ super.open(parameters);
+
+ // register the accumulator instance
+
getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
+ this.emptyFieldCounter);
+ }
+
+ @Override
+ public boolean filter(Tuple t) {
+ boolean containsEmptyFields = false;
+
+ // iterate over the tuple fields looking for
empty ones
+ for (int pos = 0; pos < t.getArity(); pos++) {
+
+ String field = t.getField(pos);
+ if (field == null ||
field.trim().isEmpty()) {
+ containsEmptyFields = true;
+
+ // if an empty field is
encountered, update the
+ // accumulator
+ this.emptyFieldCounter.add(pos);
+ }
+ }
+
+ return !containsEmptyFields;
+ }
+ });
+
+ // Here, we could do further processing with the filtered
lines...
+ filteredLines.writeAsCsv(outputPath);
+
+ // execute program
+ JobExecutionResult result = env.execute("Accumulator example");
+
+ // get the accumulator result via its registration key
+ List<Integer> emptyFields =
result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
+ System.out.format("Number of detected empty fields per column:
%s\n",
+ emptyFields);
+
+ }
+
+ //
*************************************************************************
+ // UTIL METHODS
+ //
*************************************************************************
+
+ private static String filePath;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] programArguments) {
+
+ if (programArguments.length > 0) {
+ if (programArguments.length == 2) {
+ filePath = programArguments[0];
+ outputPath = programArguments[1];
+ } else {
+ System.err
+ .println("Usage:
FilterAndCountIncompleteLines <input file path> <result path>");
+ return false;
+ }
+ } else {
+ System.err
+ .println("This program expects a
semicolon-delimited CSV file with nine columns.\n"
+ + " Usage:
FilterAndCountIncompleteLines <input file path> <result path>");
+ return false;
+ }
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static DataSource<Tuple> getDataSet(ExecutionEnvironment env) {
+
+ DataSource<? extends Tuple> source = env
+ .readCsvFile(filePath)
+ .fieldDelimiter(';')
+ .includeFields("111111111")
+ .types(String.class, String.class,
String.class, String.class,
+ String.class, String.class,
String.class, String.class,
+ String.class);
+ return (DataSource<Tuple>) source;
+ }
+
+ /**
+ * This accumulator lets you increase vector components distributedly.
The
+ * {@link #add(Integer)} method lets you increase the <i>n</i>-th vector
+ * component by 1, whereat <i>n</i> is the methods parameter. The size
of
+ * the vector is automatically managed.
+ */
+ public static class VectorAccumulator implements
+ Accumulator<Integer, List<Integer>> {
+
+ /** Stores the accumulated vector components. */
+ private final List<Integer> resultVector = new
ArrayList<Integer>();
+
+ /**
+ * Increases the result vector component at the specified
position by 1.
+ */
+ @Override
+ public void add(Integer position) {
+ updateResultVector(position, 1);
+ }
+
+ /**
+ * Increases the result vector component at the specified
position by
+ * the specified delta.
+ */
+ private void updateResultVector(int position, int delta) {
+ // inflate the vector to contain the given position
+ while (resultVector.size() <= position) {
+ resultVector.add(0);
+ }
+
+ // increment the component value
+ int component = resultVector.get(position);
+ this.resultVector.set(position, component + delta);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ *
eu.stratosphere.api.common.accumulators.Accumulator#getLocalValue()
+ */
+ @Override
+ public List<Integer> getLocalValue() {
+ return this.resultVector;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
eu.stratosphere.api.common.accumulators.Accumulator#resetLocal()
+ */
+ @Override
+ public void resetLocal() {
+ // clear the result vector if the accumulator instance
shall be reused
+ this.resultVector.clear();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ *
eu.stratosphere.api.common.accumulators.Accumulator#merge(eu.stratosphere
+ * .api.common.accumulators.Accumulator)
+ */
+ @Override
+ public void merge(Accumulator<Integer, List<Integer>> other) {
+
+ // merge two vector accumulators by adding their up
their vector components
+ List<Integer> otherVector = other.getLocalValue();
+ for (int index = 0; index < otherVector.size();
index++) {
+ updateResultVector(index,
otherVector.get(index));
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ *
eu.stratosphere.core.io.IOReadableWritable#write(java.io.DataOutput)
--- End diff --
Can you remove these Javadocs? Every modern IDE allows to jump to the
method definition in the interface.
> Add Java example program with Accumulators
> ------------------------------------------
>
> Key: FLINK-828
> URL: https://issues.apache.org/jira/browse/FLINK-828
> Project: Flink
> Issue Type: Bug
> Reporter: GitHub Import
> Labels: github-import, starter
> Fix For: pre-apache
>
>
> We need a Java example program that shows the use of Accumulators.
> The new example program should follow the general layout of the other example
> programs, i.e., run without parameters on default data, have the same code
> structure, etc.
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/828
> Created by: [fhueske|https://github.com/fhueske]
> Labels: documentation, java api,
> Milestone: Release 0.6 (unplanned)
> Created at: Sat May 17 12:46:44 CEST 2014
> State: open
--
This message was sent by Atlassian JIRA
(v6.2#6252)