This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push: new aa5c7c4 APEXMALHAR-2550 Made NycTaxiDataReader and NycTaxiCsvParser more resilient to data with bad format aa5c7c4 is described below commit aa5c7c42f86966d2954dfb988a85099ce2c9b5c8 Author: David Yan <david...@apache.org> AuthorDate: Tue Nov 28 20:32:46 2017 -0800 APEXMALHAR-2550 Made NycTaxiDataReader and NycTaxiCsvParser more resilient to data with bad format --- .../apache/apex/examples/nyctaxi/NycTaxiCsvParser.java | 10 ++++++++-- .../apex/examples/nyctaxi/NycTaxiDataReader.java | 18 ++++++++++-------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java index 3e13e76..7ecf62a 100644 --- a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java +++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiCsvParser.java @@ -21,6 +21,9 @@ package org.apache.apex.examples.nyctaxi; import java.util.HashMap; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.commons.lang3.StringUtils; import com.datatorrent.api.DefaultInputPort; @@ -40,17 +43,20 @@ public class NycTaxiCsvParser extends BaseOperator @Override public void process(String tuple) { - String[] values = tuple.split(","); + String[] values = tuple.split(",", -1); Map<String, String> outputTuple = new HashMap<>(); - if (StringUtils.isNumeric(values[0])) { + if (values.length > 18 && StringUtils.isNumeric(values[0])) { outputTuple.put("pickup_time", values[1]); outputTuple.put("pickup_lon", values[5]); outputTuple.put("pickup_lat", values[6]); outputTuple.put("total_fare", values[18]); output.emit(outputTuple); + } else { + LOG.warn("Dropping tuple with unrecognized format: {}", tuple); } } }; public final transient DefaultOutputPort<Map<String, String>> output = new DefaultOutputPort<>(); + private static final Logger LOG = LoggerFactory.getLogger(NycTaxiCsvParser.class); } diff --git a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java index b2168f4..1d4114c 100644 --- a/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java +++ b/examples/nyctaxi/src/main/java/org/apache/apex/examples/nyctaxi/NycTaxiDataReader.java @@ -54,14 +54,16 @@ public class NycTaxiDataReader extends LineByLineFileInputOperator protected String readEntity() throws IOException { String line = super.readEntity(); - String[] fields = line.split(","); - String timestamp = fields[1]; - if (currentTimestamp == null) { - currentTimestamp = timestamp; - } else if (timestamp != currentTimestamp) { - // suspend emit until the next streaming window when timestamp is different from the current timestamp. - suspendEmit = true; - currentTimestamp = timestamp; + String[] fields = line.split(",", -1); + if (fields.length > 1) { + String timestamp = fields[1]; + if (currentTimestamp == null) { + currentTimestamp = timestamp; + } else if (timestamp != currentTimestamp) { + // suspend emit until the next streaming window when timestamp is different from the current timestamp. + suspendEmit = true; + currentTimestamp = timestamp; + } } return line; } -- To stop receiving notification emails like this one, please contact ['"commits@apex.apache.org" <commits@apex.apache.org>'].