[
https://issues.apache.org/jira/browse/FLINK-828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14049738#comment-14049738
]
ASF GitHub Bot commented on FLINK-828:
--------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/55#discussion_r14445872
--- Diff:
stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/relational/FilterAndCountIncompleteLines.java
---
@@ -0,0 +1,247 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project
(http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the
License.
+ *
+
**********************************************************************************************************************/
+package eu.stratosphere.example.java.relational;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import eu.stratosphere.api.common.JobExecutionResult;
+import eu.stratosphere.api.common.accumulators.Accumulator;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.FilterFunction;
+import eu.stratosphere.api.java.tuple.Tuple;
+import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.configuration.Configuration;
+
+/**
+ * This program filters lines from a CSV file with empty fields. In doing
so, it counts the number of empty fields per
+ * column within a CSV file using a custom accumulator for vectors. In
this context, empty fields are those, that at
+ * most contain whitespace characters like space and tab.
+ * <p>
+ * The input file is a plain text CSV file with the semicolon as field
separator and double quotes as field delimiters
+ * and three columns. See {@link #getDataSet(ExecutionEnvironment)} for
configuration.
+ * <p>
+ * Usage: <code>FilterAndCountIncompleteLines <input file path or
"example"> <result path></code> <br>
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>custom accumulators
+ * <li>tuple data types
+ * <li>inline-defined functions
+ * </ul>
+ */
+@SuppressWarnings("serial")
+public class FilterAndCountIncompleteLines {
+
+ //
*************************************************************************
+ // PROGRAM
+ //
*************************************************************************
+
+ private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
+
+ public static void main(final String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
+ // get the data set
+ final DataSet<Tuple> file = getDataSet(env);
+
+ // filter lines with empty fields
+ final DataSet<Tuple> filteredLines = file
+ .filter(new FilterFunction<Tuple>() {
+
+ // create a new accumulator in each
filter function instance
+ // accumulators can be merged later on
+ private final VectorAccumulator
emptyFieldCounter = new VectorAccumulator();
+
+ @Override
+ public void open(final Configuration
parameters) throws Exception {
+ super.open(parameters);
+
+ // register the accumulator
instance
+
getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
+
this.emptyFieldCounter);
+ }
+
+ @Override
+ public boolean filter(final Tuple t) {
+ boolean containsEmptyFields =
false;
+
+ // iterate over the tuple
fields looking for empty ones
+ for (int pos = 0; pos <
t.getArity(); pos++) {
+
+ final String field =
t.getField(pos);
+ if (field == null ||
field.trim().isEmpty()) {
+
containsEmptyFields = true;
+
+ // if an empty
field is encountered, update the
+ // accumulator
+
this.emptyFieldCounter.add(pos);
+ }
+ }
+
+ return !containsEmptyFields;
+ }
+ });
+
+ // Here, we could do further processing with the filtered
lines...
+ filteredLines.writeAsCsv(outputPath);
+
+ // execute program
+ final JobExecutionResult result = env.execute("Accumulator
example");
+
+ // get the accumulator result via its registration key
+ final List<Integer> emptyFields =
result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
+ System.out.format("Number of detected empty fields per column:
%s\n",
+ emptyFields);
+
+ }
+
+ //
*************************************************************************
+ // UTIL METHODS
+ //
*************************************************************************
+
+ private static String filePath;
+ private static String outputPath;
+
+ private static boolean parseParameters(final String[] programArguments)
{
+
+ if (programArguments.length > 0) {
+ if (programArguments.length == 2) {
+ filePath = programArguments[0];
+ outputPath = programArguments[1];
+ } else {
+ System.err.println("Usage:
FilterAndCountIncompleteLines <input file path or \"example\"> <result path>");
+ return false;
+ }
+ } else {
+ System.err.println("This program expects a
semicolon-delimited CSV file with nine columns.\n"
--- End diff --
Can you change the code that it does not require any parameters at all?
(The user still has to figure out how to pass parameters to the program)
So if the user does not specify any arguments, we use
`getExampleInputTuples()` to get some input data. We print the result using
dataSet.print() to stdout.
> Add Java example program with Accumulators
> ------------------------------------------
>
> Key: FLINK-828
> URL: https://issues.apache.org/jira/browse/FLINK-828
> Project: Flink
> Issue Type: Bug
> Reporter: Fabian Hueske
> Assignee: Sebastian Kruse
> Labels: github-import, starter
> Fix For: pre-apache
>
>
> We need a Java example program that shows the use of Accumulators.
> The new example program should follow the general layout of the other example
> programs, i.e., run without parameters on default data, have the same code
> structure, etc.
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/828
> Created by: [fhueske|https://github.com/fhueske]
> Labels: documentation, java api,
> Milestone: Release 0.6 (unplanned)
> Created at: Sat May 17 12:46:44 CEST 2014
> State: open
--
This message was sent by Atlassian JIRA
(v6.2#6252)