http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java new file mode 100644 index 0000000..2102813 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.avro; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData.Array; +import org.apache.avro.generic.GenericData.StringType; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.junit.Test; + +public class TestWriteAvroResult { + + @Test + public void testLogicalTypes() throws IOException, ParseException { + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types.avsc")); + final WriteAvroResult writer = new WriteAvroResult(schema); + + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("timeMillis", RecordFieldType.TIME.getDataType())); + fields.add(new RecordField("timeMicros", RecordFieldType.TIME.getDataType())); + fields.add(new RecordField("timestampMillis", RecordFieldType.TIMESTAMP.getDataType())); + fields.add(new RecordField("timestampMicros", RecordFieldType.TIMESTAMP.getDataType())); + fields.add(new RecordField("date", RecordFieldType.DATE.getDataType())); + final RecordSchema recordSchema = new SimpleRecordSchema(fields); + + final String expectedTime = "2017-04-04 14:20:33.000"; + final DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + df.setTimeZone(TimeZone.getTimeZone("gmt")); + final long timeLong = df.parse(expectedTime).getTime(); + + final Map<String, Object> values = new HashMap<>(); + values.put("timeMillis", new Time(timeLong)); + values.put("timeMicros", new Time(timeLong)); + values.put("timestampMillis", new Timestamp(timeLong)); + values.put("timestampMicros", new Timestamp(timeLong)); + values.put("date", new Date(timeLong)); + final Record record = new MapRecord(recordSchema, values); + + final byte[] data; + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + writer.write(RecordSet.of(record.getSchema(), record), baos); + data = baos.toByteArray(); + } + + try (final InputStream in = new ByteArrayInputStream(data)) { + final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>()); + final Schema avroSchema = dataFileStream.getSchema(); + GenericData.setStringType(avroSchema, StringType.String); + + final GenericRecord avroRecord = dataFileStream.next(); + final long secondsSinceMidnight = 33 + (20 * 60) + (14 * 60 * 60); + final long millisSinceMidnight = secondsSinceMidnight * 1000L; + + assertEquals((int) millisSinceMidnight, avroRecord.get("timeMillis")); + assertEquals(millisSinceMidnight * 1000L, avroRecord.get("timeMicros")); + assertEquals(timeLong, avroRecord.get("timestampMillis")); + assertEquals(timeLong * 1000L, avroRecord.get("timestampMicros")); + assertEquals(17260, avroRecord.get("date")); + } + } + + + @Test + public void testDataTypes() throws IOException { + // TODO: Test Enums + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/datatypes.avsc")); + final WriteAvroResult writer = new WriteAvroResult(schema); + + final List<RecordField> subRecordFields = Collections.singletonList(new RecordField("field1", RecordFieldType.STRING.getDataType())); + final RecordSchema subRecordSchema = new SimpleRecordSchema(subRecordFields); + + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("string", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("int", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("long", RecordFieldType.LONG.getDataType())); + fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType())); + fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType())); + fields.add(new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())); + fields.add(new RecordField("bytes", RecordFieldType.ARRAY.getChoiceDataType(Collections.singletonList(RecordFieldType.BYTE.getDataType())))); + fields.add(new RecordField("nullOrLong", RecordFieldType.LONG.getDataType())); + fields.add(new RecordField("array", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()))); + fields.add(new RecordField("record", RecordFieldType.RECORD.getRecordDataType(subRecordSchema))); + final RecordSchema recordSchema = new SimpleRecordSchema(fields); + + final Record innerRecord = new MapRecord(subRecordSchema, Collections.singletonMap("field1", "hello")); + + final Map<String, Object> values = new HashMap<>(); + values.put("string", "hello"); + values.put("int", 8); + values.put("long", 42L); + values.put("double", 3.14159D); + values.put("float", 1.23456F); + values.put("boolean", true); + values.put("bytes", AvroTypeUtil.convertByteArray("hello".getBytes())); + values.put("nullOrLong", null); + values.put("array", new Integer[] {1, 2, 3}); + values.put("record", innerRecord); + + final Record record = new MapRecord(recordSchema, values); + + final byte[] data; + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + writer.write(RecordSet.of(record.getSchema(), record), baos); + data = baos.toByteArray(); + } + + try (final InputStream in = new ByteArrayInputStream(data)) { + final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>()); + final Schema avroSchema = dataFileStream.getSchema(); + GenericData.setStringType(avroSchema, StringType.String); + + final GenericRecord avroRecord = dataFileStream.next(); + assertMatch(record, avroRecord); + } + } + + private void assertMatch(final Record record, final GenericRecord avroRecord) { + for (final String fieldName : record.getSchema().getFieldNames()) { + Object avroValue = avroRecord.get(fieldName); + final Object recordValue = record.getValue(fieldName); + + if (recordValue instanceof String) { + assertNotNull(fieldName + " should not have been null", avroValue); + avroValue = avroValue.toString(); + } + + if (recordValue instanceof Object[] && avroValue instanceof ByteBuffer) { + final ByteBuffer bb = (ByteBuffer) avroValue; + final Object[] objectArray = (Object[]) recordValue; + assertEquals("For field " + fieldName + ", byte buffer remaining should have been " + objectArray.length + " but was " + bb.remaining(), + objectArray.length, bb.remaining()); + + for (int i = 0; i < objectArray.length; i++) { + assertEquals(objectArray[i], bb.get()); + } + } else if (recordValue instanceof Object[]) { + assertTrue(fieldName + " should have been instanceof Array", avroValue instanceof Array); + final Array<?> avroArray = (Array<?>) avroValue; + final Object[] recordArray = (Object[]) recordValue; + assertEquals(fieldName + " not equal", recordArray.length, avroArray.size()); + for (int i = 0; i < recordArray.length; i++) { + assertEquals(fieldName + "[" + i + "] not equal", recordArray[i], avroArray.get(i)); + } + } else if (recordValue instanceof byte[]) { + final ByteBuffer bb = ByteBuffer.wrap((byte[]) recordValue); + assertEquals(fieldName + " not equal", bb, avroValue); + } else if (recordValue instanceof Record) { + assertMatch((Record) recordValue, (GenericRecord) avroValue); + } else { + assertEquals(fieldName + " not equal", recordValue, avroValue); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java index 1e53d89..cb790f1 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java @@ -20,18 +20,24 @@ package org.apache.nifi.csv; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.util.Arrays; -import java.util.HashMap; +import java.sql.Date; +import java.util.ArrayList; +import java.util.Calendar; import java.util.List; -import java.util.Map; +import java.util.TimeZone; +import org.apache.commons.csv.CSVFormat; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.junit.Assert; @@ -39,21 +45,50 @@ import org.junit.Test; import org.mockito.Mockito; public class TestCSVRecordReader { - private final DataType stringDataType = RecordFieldType.STRING.getDataType(); private final DataType doubleDataType = RecordFieldType.DOUBLE.getDataType(); - private final DataType timeDataType = RecordFieldType.TIME.getDataType(); + private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"'); + + private List<RecordField> getDefaultFields() { + final List<RecordField> fields = new ArrayList<>(); + for (final String fieldName : new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) { + fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType())); + } + return fields; + } + + @Test + public void testDate() throws IOException, MalformedRecordException { + final String text = "date\n11/30/1983"; + + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("date", RecordFieldType.DATE.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + + try (final InputStream bais = new ByteArrayInputStream(text.getBytes()); + final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, + "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) { + + final Record record = reader.nextRecord(); + final java.sql.Date date = (Date) record.getValue("date"); + final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("gmt")); + calendar.setTimeInMillis(date.getTime()); + + assertEquals(1983, calendar.get(Calendar.YEAR)); + assertEquals(10, calendar.get(Calendar.MONTH)); + assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH)); + } + } @Test public void testSimpleParse() throws IOException, MalformedRecordException { - final Map<String, DataType> overrides = new HashMap<>(); - overrides.put("balance", doubleDataType); - overrides.put("other", timeDataType); + final List<RecordField> fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); - try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"))) { - final CSVRecordReader reader = new CSVRecordReader(fis, null, overrides); + final RecordSchema schema = new SimpleRecordSchema(fields); - final RecordSchema schema = reader.getSchema(); - verifyFields(schema); + try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"))) { + final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); final Object[] record = reader.nextRecord().getValues(); final Object[] expectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; @@ -65,14 +100,14 @@ public class TestCSVRecordReader { @Test public void testMultipleRecords() throws IOException, MalformedRecordException { - final Map<String, DataType> overrides = new HashMap<>(); - overrides.put("balance", doubleDataType); + final List<RecordField> fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); - try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"))) { - final CSVRecordReader reader = new CSVRecordReader(fis, null, overrides); + final RecordSchema schema = new SimpleRecordSchema(fields); - final RecordSchema schema = reader.getSchema(); - verifyFields(schema); + try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"))) { + final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); final Object[] firstRecord = reader.nextRecord().getValues(); final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; @@ -88,14 +123,14 @@ public class TestCSVRecordReader { @Test public void testExtraWhiteSpace() throws IOException, MalformedRecordException { - final Map<String, DataType> overrides = new HashMap<>(); - overrides.put("balance", doubleDataType); + final List<RecordField> fields = getDefaultFields(); + fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f); - try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"))) { - final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), overrides); + final RecordSchema schema = new SimpleRecordSchema(fields); - final RecordSchema schema = reader.getSchema(); - verifyFields(schema); + try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"))) { + final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); final Object[] firstRecord = reader.nextRecord().getValues(); final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"}; @@ -108,15 +143,4 @@ public class TestCSVRecordReader { assertNull(reader.nextRecord()); } } - - private void verifyFields(final RecordSchema schema) { - final List<String> fieldNames = schema.getFieldNames(); - final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country"); - assertEquals(expectedFieldNames, fieldNames); - - final List<DataType> dataTypes = schema.getDataTypes(); - final List<DataType> expectedDataTypes = Arrays.asList(stringDataType, stringDataType, doubleDataType, - stringDataType, stringDataType, stringDataType, stringDataType, stringDataType); - assertEquals(expectedDataTypes, dataTypes); - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java index 04f8479..1e8997b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java @@ -26,12 +26,16 @@ import java.nio.charset.StandardCharsets; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TimeZone; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.QuoteMode; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.MapRecord; @@ -47,7 +51,8 @@ public class TestWriteCSVResult { @Test public void testDataTypes() throws IOException { - final WriteCSVResult result = new WriteCSVResult(RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); + final CSVFormat csvFormat = CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL).withRecordSeparator("\n"); + final WriteCSVResult result = new WriteCSVResult(csvFormat, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); final StringBuilder headerBuilder = new StringBuilder(); final List<RecordField> fields = new ArrayList<>(); @@ -57,7 +62,7 @@ public class TestWriteCSVResult { possibleTypes.add(RecordFieldType.INT.getDataType()); possibleTypes.add(RecordFieldType.LONG.getDataType()); - fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType(possibleTypes))); + fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getChoiceDataType(possibleTypes))); } else { fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType())); } @@ -81,7 +86,7 @@ public class TestWriteCSVResult { valueMap.put("date", new Date(now)); valueMap.put("time", new Time(now)); valueMap.put("timestamp", new Timestamp(now)); - valueMap.put("object", null); + valueMap.put("record", null); valueMap.put("choice", 48L); valueMap.put("array", null); @@ -105,9 +110,9 @@ public class TestWriteCSVResult { final StringBuilder expectedBuilder = new StringBuilder(); expectedBuilder.append("\"string\",\"true\",\"1\",\"c\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\","); - final String dateValue = new SimpleDateFormat(RecordFieldType.DATE.getDefaultFormat()).format(now); - final String timeValue = new SimpleDateFormat(RecordFieldType.TIME.getDefaultFormat()).format(now); - final String timestampValue = new SimpleDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()).format(now); + final String dateValue = getDateFormat(RecordFieldType.DATE.getDefaultFormat()).format(now); + final String timeValue = getDateFormat(RecordFieldType.TIME.getDefaultFormat()).format(now); + final String timestampValue = getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()).format(now); expectedBuilder.append('"').append(dateValue).append('"').append(','); expectedBuilder.append('"').append(timeValue).append('"').append(','); @@ -118,4 +123,10 @@ public class TestWriteCSVResult { assertEquals(expectedValues, values); } + private DateFormat getDateFormat(final String format) { + final DateFormat df = new SimpleDateFormat(format); + df.setTimeZone(TimeZone.getTimeZone("gmt")); + return df; + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java index 3757ab1..a741ad1 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java @@ -28,7 +28,6 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; -import java.util.Collections; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.Record; @@ -46,7 +45,7 @@ public class TestGrokRecordReader { grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"); - final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, Collections.emptyMap()); + final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, null); final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", "FATAL", "FINE"}; final String[] messages = new String[] {"Test Message 1", "Red", "Green", "Blue", "Yellow"}; @@ -76,7 +75,7 @@ public class TestGrokRecordReader { final String msg = "2016-08-04 13:26:32,473 INFO [Leader Election Notification Thread-1] o.a.n.LoggerClass \n" + "org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces"; final InputStream bais = new ByteArrayInputStream(msg.getBytes(StandardCharsets.UTF_8)); - final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, Collections.emptyMap()); + final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, null); final Object[] values = deserializer.nextRecord().getValues(); @@ -99,7 +98,7 @@ public class TestGrokRecordReader { grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}"); - final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, Collections.emptyMap()); + final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, null); final String[] logLevels = new String[] {"INFO", "INFO", "INFO", "WARN", "WARN"}; @@ -123,7 +122,7 @@ public class TestGrokRecordReader { grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?"); - final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, Collections.emptyMap()); + final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, null); final String[] logLevels = new String[] {"INFO", "INFO", "ERROR", "WARN", "WARN"}; @@ -155,7 +154,7 @@ public class TestGrokRecordReader { grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt"); grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"); - final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, Collections.emptyMap()); + final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, null); final String[] logLevels = new String[] {"INFO", "ERROR", "INFO"}; final String[] messages = new String[] {"message without stack trace", http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java index fa41396..11e2828 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java @@ -26,17 +26,18 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.junit.Assert; @@ -47,6 +48,10 @@ import org.mockito.Mockito; import com.jayway.jsonpath.JsonPath; public class TestJsonPathRowRecordReader { + private final String dateFormat = RecordFieldType.DATE.getDefaultFormat(); + private final String timeFormat = RecordFieldType.TIME.getDefaultFormat(); + private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat(); + private final LinkedHashMap<String, JsonPath> allJsonPaths = new LinkedHashMap<>(); @Before @@ -63,12 +68,36 @@ public class TestJsonPathRowRecordReader { allJsonPaths.put("country", JsonPath.compile("$.country")); } + + private List<RecordField> getDefaultFields() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + fields.add(new RecordField("address", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("city", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("state", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("zipCode", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("country", RecordFieldType.STRING.getDataType())); + return fields; + } + + private RecordSchema getAccountSchema() { + final List<RecordField> accountFields = new ArrayList<>(); + accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + + final RecordSchema accountSchema = new SimpleRecordSchema(accountFields); + return accountSchema; + } + + @Test public void testReadArray() throws IOException, MalformedRecordException { - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) { + final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); - final RecordSchema schema = reader.getSchema(); + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json")); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { final List<String> fieldNames = schema.getFieldNames(); final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); @@ -91,10 +120,10 @@ public class TestJsonPathRowRecordReader { @Test public void testSingleJsonElement() throws IOException, MalformedRecordException { - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) { + final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); - final RecordSchema schema = reader.getSchema(); + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json")); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { final List<String> fieldNames = schema.getFieldNames(); final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); @@ -119,17 +148,20 @@ public class TestJsonPathRowRecordReader { final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths); jsonPaths.put("account", JsonPath.compile("$.account")); - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) { + final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); + final List<RecordField> fields = getDefaultFields(); + fields.add(new RecordField("account", accountType)); + final RecordSchema schema = new SimpleRecordSchema(fields); - final RecordSchema schema = reader.getSchema(); + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json")); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { final List<String> fieldNames = schema.getFieldNames(); final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "account"}); assertEquals(expectedFieldNames, fieldNames); final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); - final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.STRING, + final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.RECORD}); assertEquals(expectedTypes, dataTypes); @@ -152,10 +184,15 @@ public class TestJsonPathRowRecordReader { final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths); jsonPaths.put("accounts", JsonPath.compile("$.accounts")); - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) { + final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); + final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType); + final List<RecordField> fields = getDefaultFields(); + fields.add(new RecordField("accounts", accountsType)); + final RecordSchema schema = new SimpleRecordSchema(fields); - final RecordSchema schema = reader.getSchema(); + + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json")); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { final List<String> fieldNames = schema.getFieldNames(); final List<String> expectedFieldNames = Arrays.asList(new String[] { @@ -163,7 +200,7 @@ public class TestJsonPathRowRecordReader { assertEquals(expectedFieldNames, fieldNames); final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); - final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.STRING, + final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY}); assertEquals(expectedTypes, dataTypes); @@ -177,17 +214,17 @@ public class TestJsonPathRowRecordReader { final Object[] array = (Object[]) lastRecord; assertEquals(2, array.length); final Object firstElement = array[0]; - assertTrue(firstElement instanceof Map); + assertTrue(firstElement instanceof Record); - final Map<?, ?> firstMap = (Map<?, ?>) firstElement; - assertEquals(42, firstMap.get("id")); - assertEquals(4750.89D, firstMap.get("balance")); + final Record firstRecord = (Record) firstElement; + assertEquals(42, firstRecord.getValue("id")); + assertEquals(4750.89D, firstRecord.getValue("balance")); final Object secondElement = array[1]; - assertTrue(secondElement instanceof Map); - final Map<?, ?> secondMap = (Map<?, ?>) secondElement; - assertEquals(43, secondMap.get("id")); - assertEquals(48212.38D, secondMap.get("balance")); + assertTrue(secondElement instanceof Record); + final Record secondRecord = (Record) secondElement; + assertEquals(43, secondRecord.getValue("id")); + assertEquals(48212.38D, secondRecord.getValue("balance")); assertNull(reader.nextRecord()); } @@ -195,10 +232,10 @@ public class TestJsonPathRowRecordReader { @Test public void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException { - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) { + final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); - final RecordSchema schema = reader.getSchema(); + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json")); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { final List<String> fieldNames = schema.getFieldNames(); final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); @@ -226,11 +263,14 @@ public class TestJsonPathRowRecordReader { public void testReadArrayDifferentSchemasWithOverride() throws IOException, MalformedRecordException { final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths); jsonPaths.put("address2", JsonPath.compile("$.address2")); - final Map<String, DataType> typeOverrides = Collections.singletonMap("address2", RecordFieldType.STRING.getDataType()); + + final List<RecordField> fields = getDefaultFields(); + fields.add(new RecordField("address2", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, typeOverrides, in, Mockito.mock(ComponentLog.class))) { - final RecordSchema schema = reader.getSchema(); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { final List<String> fieldNames = schema.getFieldNames(); final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2"}); @@ -259,10 +299,13 @@ public class TestJsonPathRowRecordReader { final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths); jsonPaths.put("accountIds", JsonPath.compile("$.accountIds")); - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/primitive-type-array.json")); - final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) { + final List<RecordField> fields = getDefaultFields(); + final DataType idsType = RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()); + fields.add(new RecordField("accountIds", idsType)); + final RecordSchema schema = new SimpleRecordSchema(fields); - final RecordSchema schema = reader.getSchema(); + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/primitive-type-array.json")); + final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) { final List<String> fieldNames = schema.getFieldNames(); final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "accountIds"}); http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java index c5ee0e3..2422206 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java @@ -25,8 +25,8 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,8 +34,10 @@ import java.util.stream.Collectors; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.junit.Assert; @@ -43,13 +45,38 @@ import org.junit.Test; import org.mockito.Mockito; public class TestJsonTreeRowRecordReader { + private final String dateFormat = RecordFieldType.DATE.getDefaultFormat(); + private final String timeFormat = RecordFieldType.TIME.getDefaultFormat(); + private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat(); + + private List<RecordField> getDefaultFields() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + fields.add(new RecordField("address", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("city", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("state", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("zipCode", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("country", RecordFieldType.STRING.getDataType())); + return fields; + } + + private RecordSchema getAccountSchema() { + final List<RecordField> accountFields = new ArrayList<>(); + accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType())); + accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + + final RecordSchema accountSchema = new SimpleRecordSchema(accountFields); + return accountSchema; + } @Test public void testReadArray() throws IOException, MalformedRecordException { - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json")); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) { + final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); - final RecordSchema schema = reader.getSchema(); + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json")); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { final List<String> fieldNames = schema.getFieldNames(); final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); @@ -72,10 +99,10 @@ public class TestJsonTreeRowRecordReader { @Test public void testSingleJsonElement() throws IOException, MalformedRecordException { - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json")); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) { + final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); - final RecordSchema schema = reader.getSchema(); + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json")); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { final List<String> fieldNames = schema.getFieldNames(); final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); @@ -95,14 +122,14 @@ public class TestJsonTreeRowRecordReader { @Test public void testElementWithNestedData() throws IOException, MalformedRecordException { - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json")); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) { - - final RecordSchema schema = reader.getSchema(); + final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); + final List<RecordField> fields = getDefaultFields(); + fields.add(new RecordField("account", accountType)); + fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); - final List<String> fieldNames = schema.getFieldNames(); - final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "address", "city", "state", "zipCode", "country", "account"}); - assertEquals(expectedFieldNames, fieldNames); + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json")); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList()); final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, @@ -125,10 +152,16 @@ public class TestJsonTreeRowRecordReader { @Test public void testElementWithNestedArray() throws IOException, MalformedRecordException { - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json")); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) { + final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); + final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType); + + final List<RecordField> fields = getDefaultFields(); + fields.add(new RecordField("accounts", accountsType)); + fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); - final RecordSchema schema = reader.getSchema(); + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json")); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { final List<String> fieldNames = schema.getFieldNames(); final List<String> expectedFieldNames = Arrays.asList(new String[] { @@ -153,10 +186,10 @@ public class TestJsonTreeRowRecordReader { @Test public void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException { - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json")); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) { + final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); - final RecordSchema schema = reader.getSchema(); + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json")); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { final List<String> fieldNames = schema.getFieldNames(); final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); @@ -185,10 +218,12 @@ public class TestJsonTreeRowRecordReader { final Map<String, DataType> overrides = new HashMap<>(); overrides.put("address2", RecordFieldType.STRING.getDataType()); - try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json")); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), overrides)) { + final List<RecordField> fields = getDefaultFields(); + fields.add(new RecordField("address2", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); - final RecordSchema schema = reader.getSchema(); + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json")); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { final List<String> fieldNames = schema.getFieldNames(); final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2"}); @@ -214,13 +249,10 @@ public class TestJsonTreeRowRecordReader { @Test public void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws IOException, MalformedRecordException { - final Map<String, DataType> overrides = new HashMap<>(); - overrides.put("balance", RecordFieldType.DOUBLE.getDataType()); + final RecordSchema schema = new SimpleRecordSchema(getDefaultFields()); try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-optional-balance.json")); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), overrides)) { - - final RecordSchema schema = reader.getSchema(); + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { final List<String> fieldNames = schema.getFieldNames(); final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); @@ -247,8 +279,22 @@ public class TestJsonTreeRowRecordReader { @Test public void testReadUnicodeCharacters() throws IOException, MalformedRecordException { + + final List<RecordField> fromFields = new ArrayList<>(); + fromFields.add(new RecordField("id", RecordFieldType.LONG.getDataType())); + fromFields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema fromSchema = new SimpleRecordSchema(fromFields); + final DataType fromType = RecordFieldType.RECORD.getRecordDataType(fromSchema); + + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("created_at", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("id", RecordFieldType.LONG.getDataType())); + fields.add(new RecordField("unicode", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("from", fromType)); + final RecordSchema schema = new SimpleRecordSchema(fields); + try (final InputStream in = new FileInputStream(new File("src/test/resources/json/json-with-unicode.json")); - final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) { + final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) { final Object[] firstRecordValues = reader.nextRecord().getValues(); http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java index f9849ba..6119d36 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java @@ -27,12 +27,14 @@ import java.nio.file.Paths; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.TimeZone; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.SimpleRecordSchema; @@ -60,14 +62,17 @@ public class TestWriteJsonResult { possibleTypes.add(RecordFieldType.INT.getDataType()); possibleTypes.add(RecordFieldType.LONG.getDataType()); - fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType(possibleTypes))); + fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getChoiceDataType(possibleTypes))); } else { fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType())); } } final RecordSchema schema = new SimpleRecordSchema(fields); - final long time = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS").parse("2017/01/01 17:00:00.000").getTime(); + final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); + df.setTimeZone(TimeZone.getTimeZone("gmt")); + final long time = df.parse("2017/01/01 17:00:00.000").getTime(); + final Map<String, Object> valueMap = new LinkedHashMap<>(); valueMap.put("string", "string"); valueMap.put("boolean", true); http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc new file mode 100644 index 0000000..cc7f60e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc @@ -0,0 +1,47 @@ +{ + "namespace": "nifi", + "name": "data_types", + "type": "record", + "fields": [ + { + "name": "string", + "type": "string" + }, { + "name": "int", + "type": "int" + }, { + "name": "long", + "type": "long" + }, { + "name": "double", + "type": "double" + }, { + "name": "float", + "type": "float" + }, { + "name": "boolean", + "type": "boolean" + }, { + "name": "bytes", + "type": "bytes" + }, { + "name": "nullOrLong", + "type": [ "null", "long" ] + }, { + "name": "array", + "type" : { + "type": "array", + "items": "int" + } + }, { + "name": "record", + "type": { + "type": "record", + "name": "subRecord", + "fields": [ + { "name": "field1", "type": "string" } + ] + } + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc new file mode 100644 index 0000000..d8315b2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc @@ -0,0 +1,34 @@ +{ + "namespace": "nifi", + "name": "data_types", + "type": "record", + "fields": [ + { + "name" : "timeMillis", + "type": { + "type": "int", + "logicalType": "time-millis" + } + }, { + "name" : "timeMicros", "type": { + "type" : "long", + "logicalType" : "time-micros" + } + }, { + "name" : "timestampMillis", "type": { + "type" : "long", + "logicalType" : "timestamp-millis" + } + }, { + "name" : "timestampMicros", "type": { + "type" : "long", + "logicalType" : "timestamp-micros" + } + }, { + "name" : "date", "type": { + "type" : "int", + "logicalType" : "date" + } + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml new file mode 100644 index 0000000..265eb71 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml @@ -0,0 +1,32 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with this + work for additional information regarding copyright ownership. The ASF licenses + this file to You under the Apache License, Version 2.0 (the "License"); you + may not use this file except in compliance with the License. You may obtain + a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-schema-registry-service-api</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java new file mode 100644 index 0000000..68c2461 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import java.util.Map; + +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.serialization.record.RecordSchema; + +/** + * Represents {@link ControllerService} strategy to expose internal and/or + * integrate with external Schema Registry + */ +public interface SchemaRegistry extends ControllerService, AutoCloseable { + + public static final String SCHEMA_NAME_ATTR = "schema.name"; + + + /** + * Retrieves and returns the textual representation of the schema based on + * the provided name of the schema available in Schema Registry. Will throw + * an runtime exception if schema can not be found. + */ + String retrieveSchemaText(String schemaName); + + /** + * Retrieves and returns the textual representation of the schema based on + * the provided name of the schema available in Schema Registry and optional + * additional attributes. Will throw an runtime exception if schema can not + * be found. + */ + String retrieveSchemaText(String schemaName, Map<String, String> attributes); + + + RecordSchema retrieveSchema(String schemaName); + + + RecordSchema retrieveSchema(String schemaName, Map<String, String> attributes); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml index eae3515..5cee52e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml @@ -58,6 +58,11 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-registry-service-api</artifactId> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-record-serialization-service-api</artifactId> <scope>compile</scope> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/pom.xml index 3948a1b..4fac7d2 100644 --- a/nifi-nar-bundles/nifi-standard-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/pom.xml @@ -35,6 +35,7 @@ <module>nifi-dbcp-service-bundle</module> <module>nifi-hbase-client-service-api</module> <module>nifi-hbase_1_1_2-client-service-bundle</module> + <module>nifi-schema-registry-service-api</module> <module>nifi-record-serialization-service-api</module> <module>nifi-record-serialization-services-bundle</module> </modules> http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 057832b..0173b04 100644 --- a/pom.xml +++ b/pom.xml @@ -681,7 +681,7 @@ language governing permissions and limitations under the License. --> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> - <version>1.7.7</version> + <version>1.8.1</version> </dependency> <dependency> <groupId>com.sun.jersey</groupId> @@ -921,6 +921,11 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-registry-service</artifactId> + <version>1.2.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-resources</artifactId> <version>1.2.0-SNAPSHOT</version> <classifier>resources</classifier> @@ -971,6 +976,11 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-registry-service-api</artifactId> + <version>1.2.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-distributed-cache-services-nar</artifactId> <version>1.2.0-SNAPSHOT</version> <type>nar</type>
