Repository: nifi
Updated Branches:
  refs/heads/master 3921ab827 -> 9cde92da1


NIFI-1656 Added locale support to ConvertAvroSchema and fixed locale problems 
in unit tests

Reviewed by Joe Witt ([email protected]). This closes #292


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9cde92da
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9cde92da
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9cde92da

Branch: refs/heads/master
Commit: 9cde92da164393c50d74c4cc4c0d6bf82c975e9c
Parents: 3921ab8
Author: trkurc <[email protected]>
Authored: Sun Mar 20 18:25:04 2016 -0400
Committer: trkurc <[email protected]>
Committed: Mon Mar 21 01:44:46 2016 -0400

----------------------------------------------------------------------
 .../processors/kite/AvroRecordConverter.java    |  21 ++++
 .../nifi/processors/kite/ConvertAvroSchema.java |  39 +++++++-
 .../kite/TestAvroRecordConverter.java           |   3 +-
 .../processors/kite/TestConvertAvroSchema.java  | 100 +++++++++++++++++--
 4 files changed, 154 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9cde92da/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
index 68e6c98..f66e9ed 100644
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
@@ -21,6 +21,7 @@ package org.apache.nifi.processors.kite;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Scanner;
 
@@ -43,6 +44,8 @@ public class AvroRecordConverter {
     private final Schema outputSchema;
     // Store this from output field to input field so we can look up by output.
     private final Map<String, String> fieldMapping;
+    private final Locale locale;
+    private static final Locale DEFAULT_LOCALE = Locale.getDefault();
 
     /**
      * @param inputSchema
@@ -55,6 +58,22 @@ public class AvroRecordConverter {
      */
     public AvroRecordConverter(Schema inputSchema, Schema outputSchema,
             Map<String, String> fieldMapping) {
+        this(inputSchema, outputSchema, fieldMapping, DEFAULT_LOCALE);
+    }
+
+    /**
+     * @param inputSchema
+     *            Schema of input record objects
+     * @param outputSchema
+     *            Schema of output record objects
+     * @param fieldMapping
+     *            Map from field name in input record to field name in output
+     *            record.
+     * @param locale
+     *            Locale to use
+     */
+    public AvroRecordConverter(Schema inputSchema, Schema outputSchema,
+            Map<String, String> fieldMapping, Locale locale) {
         this.inputSchema = inputSchema;
         this.outputSchema = outputSchema;
         // Need to reverse this map.
@@ -63,6 +82,7 @@ public class AvroRecordConverter {
         for (Map.Entry<String, String> entry : fieldMapping.entrySet()) {
             this.fieldMapping.put(entry.getValue(), entry.getKey());
         }
+        this.locale = locale;
     }
 
     /**
@@ -224,6 +244,7 @@ public class AvroRecordConverter {
             // return questionable results when a String starts with a number
             // but then contains other content
             Scanner scanner = new Scanner(content.toString());
+            scanner.useLocale(locale);
             switch (nonNillOutput.getType()) {
             case LONG:
                 if (scanner.hasNextLong()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/9cde92da/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
index d64f5df..b0d3518 100644
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -34,6 +35,7 @@ import org.apache.avro.file.DataFileStream;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumReader;
+import org.apache.commons.lang.LocaleUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -128,6 +130,27 @@ public class ConvertAvroSchema extends 
AbstractKiteProcessor {
         }
     };
 
+    public static final String DEFAULT_LOCALE_VALUE = "default";
+    public static final Validator LOCALE_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            String reason = null;
+            if (value.equals(DEFAULT_LOCALE_VALUE) == false) {
+                try {
+                    final Locale locale = LocaleUtils.toLocale(value);
+                    if (locale == null) {
+                        reason = "null locale returned";
+                    } else if (LocaleUtils.isAvailableLocale(locale) == false) 
{
+                        reason = "locale not available";
+                    }
+                } catch (final IllegalArgumentException e) {
+                    reason = "invalid format for locale";
+                }
+            }
+            return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
+        }
+    };
+
     @VisibleForTesting
     static final PropertyDescriptor INPUT_SCHEMA = new 
PropertyDescriptor.Builder()
             .name("Input Schema").description("Avro Schema of Input Flowfiles")
@@ -141,10 +164,19 @@ public class ConvertAvroSchema extends 
AbstractKiteProcessor {
             
.addValidator(MAPPED_SCHEMA_VALIDATOR).expressionLanguageSupported(true)
             .required(true).build();
 
+    @VisibleForTesting
+    static final PropertyDescriptor LOCALE = new PropertyDescriptor.Builder()
+            .name("Locale")
+            .description("Locale to use for scanning data (see 
https://docs.oracle.com/javase/7/docs/api/java/util/Locale.html)" +
+                    "or \" " + DEFAULT_LOCALE_VALUE + "\" for JVM default")
+            .addValidator(LOCALE_VALIDATOR)
+            .defaultValue(DEFAULT_LOCALE_VALUE).build();
+
     private static final List<PropertyDescriptor> PROPERTIES = ImmutableList
             .<PropertyDescriptor> builder()
             .add(INPUT_SCHEMA)
-            .add(OUTPUT_SCHEMA).build();
+            .add(OUTPUT_SCHEMA)
+            .add(LOCALE).build();
 
     private static final Set<Relationship> RELATIONSHIPS = ImmutableSet
             .<Relationship> builder().add(SUCCESS).add(FAILURE).build();
@@ -240,8 +272,11 @@ public class ConvertAvroSchema extends 
AbstractKiteProcessor {
                 fieldMapping.put(entry.getKey().getName(), entry.getValue());
             }
         }
+        // Set locale
+        final String localeProperty = context.getProperty(LOCALE).getValue();
+        final Locale locale = (localeProperty == 
DEFAULT_LOCALE_VALUE)?Locale.getDefault():LocaleUtils.toLocale(localeProperty);
         final AvroRecordConverter converter = new AvroRecordConverter(
-                inputSchema, outputSchema, fieldMapping);
+                inputSchema, outputSchema, fieldMapping, locale);
 
         final DataFileWriter<Record> writer = new DataFileWriter<>(
                 AvroUtil.newDatumWriter(outputSchema, Record.class));

http://git-wip-us.apache.org/repos/asf/nifi/blob/9cde92da/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java
index 1a4748f..11e86bf 100644
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericData.Record;
+import org.apache.commons.lang.LocaleUtils;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Test;
 
@@ -87,7 +88,7 @@ public class TestAvroRecordConverter {
                 .endRecord();
 
         AvroRecordConverter converter = new AvroRecordConverter(input, output,
-                EMPTY_MAPPING);
+                EMPTY_MAPPING, LocaleUtils.toLocale("en_US"));
 
         Record inputRecord = new Record(input);
         inputRecord.put("s1", null);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9cde92da/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
index 33f3a82..2da0513 100644
--- 
a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
+++ 
b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
@@ -22,13 +22,17 @@ import static 
org.apache.nifi.processors.kite.TestUtil.streamFor;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.text.NumberFormat;
+import java.text.ParseException;
 import java.util.List;
+import java.util.Locale;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumReader;
+import org.apache.commons.lang.LocaleUtils;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -60,12 +64,89 @@ public class TestConvertAvroSchema {
                 INPUT_SCHEMA.toString());
         runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
                 OUTPUT_SCHEMA.toString());
+        Locale locale = Locale.getDefault();
         runner.setProperty("primaryColor", "color");
         runner.assertValid();
 
+        NumberFormat format = NumberFormat.getInstance(locale);
+
+        // Two valid rows, and one invalid because "free" is not a double.
+        Record goodRecord1 = dataBasic("1", "blue", null, null);
+        Record goodRecord2 = dataBasic("2", "red", "yellow", 
format.format(5.5));
+        Record badRecord = dataBasic("3", "red", "yellow", "free");
+        List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
+                badRecord);
+
+        runner.enqueue(streamFor(input));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 2 rows", 2, converted);
+        Assert.assertEquals("Should reject 1 rows", 1, errors);
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 1);
+
+        MockFlowFile incompatible = runner.getFlowFilesForRelationship(
+                "failure").get(0);
+        GenericDatumReader<Record> reader = new GenericDatumReader<Record>(
+                INPUT_SCHEMA);
+        DataFileStream<Record> stream = new DataFileStream<Record>(
+                new ByteArrayInputStream(
+                        runner.getContentAsByteArray(incompatible)), reader);
+        int count = 0;
+        for (Record r : stream) {
+            Assert.assertEquals(badRecord, r);
+            count++;
+        }
+        stream.close();
+        Assert.assertEquals(1, count);
+        Assert.assertEquals("Should accumulate error messages",
+                FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+
+        GenericDatumReader<Record> successReader = new 
GenericDatumReader<Record>(
+                OUTPUT_SCHEMA);
+        DataFileStream<Record> successStream = new DataFileStream<Record>(
+                new ByteArrayInputStream(runner.getContentAsByteArray(runner
+                        .getFlowFilesForRelationship("success").get(0))),
+                successReader);
+        count = 0;
+        for (Record r : successStream) {
+            if (count == 0) {
+                Assert.assertEquals(convertBasic(goodRecord1, locale), r);
+            } else {
+                Assert.assertEquals(convertBasic(goodRecord2, locale), r);
+            }
+            count++;
+        }
+        successStream.close();
+        Assert.assertEquals(2, count);
+    }
+
+    @Test
+    public void testBasicConversionWithLocales() throws IOException {
+        testBasicConversionWithLocale("en_US");
+        testBasicConversionWithLocale("fr_FR");
+    }
+
+    public void testBasicConversionWithLocale(String localeString) throws 
IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
+                INPUT_SCHEMA.toString());
+        runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
+                OUTPUT_SCHEMA.toString());
+        Locale locale = LocaleUtils.toLocale(localeString);
+        runner.setProperty(ConvertAvroSchema.LOCALE, localeString);
+        runner.setProperty("primaryColor", "color");
+        runner.assertValid();
+
+        NumberFormat format = NumberFormat.getInstance(locale);
+
         // Two valid rows, and one invalid because "free" is not a double.
         Record goodRecord1 = dataBasic("1", "blue", null, null);
-        Record goodRecord2 = dataBasic("2", "red", "yellow", "5.5");
+        Record goodRecord2 = dataBasic("2", "red", "yellow", 
format.format(5.5));
         Record badRecord = dataBasic("3", "red", "yellow", "free");
         List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
                 badRecord);
@@ -107,9 +188,9 @@ public class TestConvertAvroSchema {
         count = 0;
         for (Record r : successStream) {
             if (count == 0) {
-                Assert.assertEquals(convertBasic(goodRecord1), r);
+                Assert.assertEquals(convertBasic(goodRecord1, locale), r);
             } else {
-                Assert.assertEquals(convertBasic(goodRecord2), r);
+                Assert.assertEquals(convertBasic(goodRecord2, locale), r);
             }
             count++;
         }
@@ -163,15 +244,22 @@ public class TestConvertAvroSchema {
         Assert.assertEquals(2, count);
     }
 
-    private Record convertBasic(Record inputRecord) {
+    private Record convertBasic(Record inputRecord, Locale locale) {
         Record result = new Record(OUTPUT_SCHEMA);
         result.put("id", Long.parseLong(inputRecord.get("id").toString()));
         result.put("color", inputRecord.get("primaryColor").toString());
         if (inputRecord.get("price") == null) {
             result.put("price", null);
         } else {
-            result.put("price",
-                    Double.parseDouble(inputRecord.get("price").toString()));
+            final NumberFormat format = NumberFormat.getInstance(locale);
+            double price;
+            try {
+                price = 
format.parse(inputRecord.get("price").toString()).doubleValue();
+            } catch (ParseException e) {
+                // Shouldn't happen
+                throw new RuntimeException(e);
+            }
+            result.put("price", price);
         }
         return result;
     }

Reply via email to