[ 
https://issues.apache.org/jira/browse/FLINK-828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14049676#comment-14049676
 ] 

ASF GitHub Bot commented on FLINK-828:
--------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/55#discussion_r14443022
  
    --- Diff: 
stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/relational/FilterAndCountIncompleteLines.java
 ---
    @@ -0,0 +1,281 @@
    
+/***********************************************************************************************************************
    + *
    + * Copyright (C) 2010-2013 by the Stratosphere project 
(http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may 
not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
    + * specific language governing permissions and limitations under the 
License.
    + *
    + 
**********************************************************************************************************************/
    +package eu.stratosphere.example.java.relational;
    +
    +import java.io.DataInput;
    +import java.io.DataOutput;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import eu.stratosphere.api.common.JobExecutionResult;
    +import eu.stratosphere.api.common.accumulators.Accumulator;
    +import eu.stratosphere.api.java.DataSet;
    +import eu.stratosphere.api.java.ExecutionEnvironment;
    +import eu.stratosphere.api.java.functions.FilterFunction;
    +import eu.stratosphere.api.java.operators.DataSource;
    +import eu.stratosphere.api.java.tuple.Tuple;
    +import eu.stratosphere.configuration.Configuration;
    +
    +/**
    + * This program filters lines from a CSV file with empty fields. In doing 
so, it
    + * counts the number of empty fields per column within a CSV file using a 
custom
    + * accumulator for vectors. In this context, empty fields are those, that 
at
    + * most contain whitespace characters like space and tab.
    + * 
    + * <p>
    + * The input file is a plain text CSV file with the semicolon as field 
separator
    + * and double quotes as field delimiters and 9 columns. See
    + * {@link #getDataSet(ExecutionEnvironment)} for configuration.
    + * 
    + * <p>
    + * Usage: <code>FilterAndCountIncompleteLines &lt;input file path&gt; 
&lt;result path&gt;</code> <br>
    + * 
    + * <p>
    + * This example shows how to use:
    + * <ul>
    + * <li>custom accumulators
    + * <li>tuple data types
    + * <li>inline-defined functions
    + * </ul>
    + */
    +@SuppressWarnings("serial")
    +public class FilterAndCountIncompleteLines {
    +
    +   // 
*************************************************************************
    +   // PROGRAM
    +   // 
*************************************************************************
    +
    +   private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
    +
    +   public static void main(String[] args) throws Exception {
    +
    +           if (!parseParameters(args)) {
    +                   return;
    +           }
    +
    +           final ExecutionEnvironment env = ExecutionEnvironment
    +                           .getExecutionEnvironment();
    +
    +           // get the data set
    +           DataSet<Tuple> file = getDataSet(env);
    +
    +           // filter lines with empty fields
    +           DataSet<Tuple> filteredLines = file.filter(new 
FilterFunction<Tuple>() {
    +
    +                   // create a new accumulator in each filter function 
instance
    +                   // accumulators can be merged later on
    +                   private VectorAccumulator emptyFieldCounter = new 
VectorAccumulator();
    +
    +                   /*
    +                    * (non-Javadoc)
    +                    * 
    +                    * @see
    +                    * 
eu.stratosphere.api.common.functions.AbstractFunction#open(eu
    +                    * .stratosphere.configuration.Configuration)
    +                    */
    +                   @Override
    +                   public void open(Configuration parameters) throws 
Exception {
    +                           super.open(parameters);
    +
    +                           // register the accumulator instance
    +                           
getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
    +                                           this.emptyFieldCounter);
    +                   }
    +
    +                   @Override
    +                   public boolean filter(Tuple t) {
    +                           boolean containsEmptyFields = false;
    +
    +                           // iterate over the tuple fields looking for 
empty ones
    +                           for (int pos = 0; pos < t.getArity(); pos++) {
    +
    +                                   String field = t.getField(pos);
    +                                   if (field == null || 
field.trim().isEmpty()) {
    +                                           containsEmptyFields = true;
    +
    +                                           // if an empty field is 
encountered, update the
    +                                           // accumulator
    +                                           this.emptyFieldCounter.add(pos);
    +                                   }
    +                           }
    +
    +                           return !containsEmptyFields;
    +                   }
    +           });
    +
    +           // Here, we could do further processing with the filtered 
lines...
    +           filteredLines.writeAsCsv(outputPath);
    +
    +           // execute program
    +           JobExecutionResult result = env.execute("Accumulator example");
    +
    +           // get the accumulator result via its registration key
    +           List<Integer> emptyFields = 
result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
    +           System.out.format("Number of detected empty fields per column: 
%s\n",
    +                           emptyFields);
    +
    +   }
    +
    +   // 
*************************************************************************
    +   // UTIL METHODS
    +   // 
*************************************************************************
    +
    +   private static String filePath;
    +   private static String outputPath;
    +
    +   private static boolean parseParameters(String[] programArguments) {
    +
    +           if (programArguments.length > 0) {
    +                   if (programArguments.length == 2) {
    +                           filePath = programArguments[0];
    +                           outputPath = programArguments[1];
    +                   } else {
    +                           System.err
    +                                           .println("Usage: 
FilterAndCountIncompleteLines <input file path> <result path>");
    --- End diff --
    
    Can you turn this into `System.err.println` without the newline and tabs in 
between?


> Add Java example program with Accumulators
> ------------------------------------------
>
>                 Key: FLINK-828
>                 URL: https://issues.apache.org/jira/browse/FLINK-828
>             Project: Flink
>          Issue Type: Bug
>            Reporter: GitHub Import
>              Labels: github-import, starter
>             Fix For: pre-apache
>
>
> We need a Java example program that shows the use of Accumulators.
> The new example program should follow the general layout of the other example 
> programs, i.e., run without parameters on default data, have the same code 
> structure, etc.
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/828
> Created by: [fhueske|https://github.com/fhueske]
> Labels: documentation, java api, 
> Milestone: Release 0.6 (unplanned)
> Created at: Sat May 17 12:46:44 CEST 2014
> State: open



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to