Repository: incubator-gobblin Updated Branches: refs/heads/master c101493f8 -> 492b93d54
[GOBBLIN-223][GOBBLIN-177] Changed CsvToJsonConverter to throw DataConversionException Closes #2074 from aditya1105/csvConverter Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/492b93d5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/492b93d5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/492b93d5 Branch: refs/heads/master Commit: 492b93d5467c7c978b0de798a9da72ec3ef8c30b Parents: c101493 Author: aditya1105 <[email protected]> Authored: Tue Aug 22 21:42:04 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Aug 22 21:42:04 2017 -0700 ---------------------------------------------------------------------- .../converter/csv/CsvToJsonConverter.java | 48 ++++++++++---------- 1 file changed, 23 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/492b93d5/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverter.java index 8672b6b..7c231f3 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverter.java @@ -55,41 +55,39 @@ public class CsvToJsonConverter extends Converter<String, JsonArray, String, Jso * Takes in a record with format String and splits the data based on SOURCE_SCHEMA_DELIMITER * Uses the inputSchema and the split record to convert the record to a JsonObject * @return a JsonObject representing the record - * @throws IOException + * @throws DataConversionException */ @Override public Iterable<JsonObject> convertRecord(JsonArray outputSchema, String inputRecord, WorkUnitState workUnit) throws DataConversionException { - String strDelimiter = workUnit.getProp(ConfigurationKeys.CONVERTER_CSV_TO_JSON_DELIMITER); - if (Strings.isNullOrEmpty(strDelimiter)) { - throw new IllegalArgumentException("Delimiter cannot be empty"); - } - InputStreamCSVReader reader = new InputStreamCSVReader(inputRecord, strDelimiter.charAt(0), - workUnit.getProp(ConfigurationKeys.CONVERTER_CSV_TO_JSON_ENCLOSEDCHAR, - ConfigurationKeys.DEFAULT_CONVERTER_CSV_TO_JSON_ENCLOSEDCHAR).charAt(0)); - List<String> recordSplit; try { + String strDelimiter = workUnit.getProp(ConfigurationKeys.CONVERTER_CSV_TO_JSON_DELIMITER); + if (Strings.isNullOrEmpty(strDelimiter)) { + throw new IllegalArgumentException("Delimiter cannot be empty"); + } + InputStreamCSVReader reader = new InputStreamCSVReader(inputRecord, strDelimiter.charAt(0), + workUnit.getProp(ConfigurationKeys.CONVERTER_CSV_TO_JSON_ENCLOSEDCHAR, ConfigurationKeys.DEFAULT_CONVERTER_CSV_TO_JSON_ENCLOSEDCHAR).charAt(0)); + List<String> recordSplit; recordSplit = Lists.newArrayList(reader.splitRecord()); - } catch (IOException e) { - throw new DataConversionException(e); - } - JsonObject outputRecord = new JsonObject(); + JsonObject outputRecord = new JsonObject(); - for (int i = 0; i < outputSchema.size(); i++) { - if (i < recordSplit.size()) { - if (recordSplit.get(i) == null) { - outputRecord.add(outputSchema.get(i).getAsJsonObject().get("columnName").getAsString(), JsonNull.INSTANCE); - } else if (recordSplit.get(i).isEmpty() || recordSplit.get(i).toLowerCase().equals(NULL)) { - outputRecord.add(outputSchema.get(i).getAsJsonObject().get("columnName").getAsString(), JsonNull.INSTANCE); + for (int i = 0; i < outputSchema.size(); i++) { + if (i < recordSplit.size()) { + if (recordSplit.get(i) == null) { + outputRecord.add(outputSchema.get(i).getAsJsonObject().get("columnName").getAsString(), JsonNull.INSTANCE); + } else if (recordSplit.get(i).isEmpty() || recordSplit.get(i).toLowerCase().equals(NULL)) { + outputRecord.add(outputSchema.get(i).getAsJsonObject().get("columnName").getAsString(), JsonNull.INSTANCE); + } else { + outputRecord.addProperty(outputSchema.get(i).getAsJsonObject().get("columnName").getAsString(), recordSplit.get(i)); + } } else { - outputRecord.addProperty(outputSchema.get(i).getAsJsonObject().get("columnName").getAsString(), - recordSplit.get(i)); + outputRecord.add(outputSchema.get(i).getAsJsonObject().get("columnName").getAsString(), JsonNull.INSTANCE); } - } else { - outputRecord.add(outputSchema.get(i).getAsJsonObject().get("columnName").getAsString(), JsonNull.INSTANCE); } - } - return new SingleRecordIterable<>(outputRecord); + return new SingleRecordIterable<>(outputRecord); + } catch (Exception e) { + throw new DataConversionException(e); + } } }
