Repository: nifi Updated Branches: refs/heads/master ded18b94d -> 6a1854c97
http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java new file mode 100644 index 0000000..ba45563 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.attributes.CoreAttributes; + + +@Tags({ "registry", "schema", "avro", "json", "transform" }) +@CapabilityDescription("Transforms AVRO content of the Flow File to JSON using the schema provided by the Schema Registry Service.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public final class TransformAvroToJson extends AbstractContentTransformer { + + /** + * + */ + @Override + protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { + GenericRecord avroRecord = AvroUtils.read(in, schema); + JsonUtils.write(avroRecord, out); + return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "application/json"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java new file mode 100644 index 0000000..f44e440 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; + +@Tags({ "csv", "avro", "transform", "registry", "schema" }) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Transforms CSV content of the Flow File to Avro using the schema provided by the Schema Registry Service.") +public final class TransformCSVToAvro extends AbstractCSVTransformer { + + private static final List<PropertyDescriptor> DESCRIPTORS; + + static { + List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.addAll(BASE_CSV_DESCRIPTORS); + descriptors.add(QUOTE); + DESCRIPTORS = Collections.unmodifiableList(descriptors); + } + + private volatile char quoteChar; + + /** + * + */ + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + super.onScheduled(context); + this.quoteChar = context.getProperty(QUOTE).getValue().charAt(0); + } + + /** + * + */ + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + /** + * + */ + @Override + protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { + GenericRecord avroRecord = CSVUtils.read(in, this.delimiter, schema, this.quoteChar); + AvroUtils.write(avroRecord, out); + return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "binary/avro"); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java new file mode 100644 index 0000000..2ce9fbe --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; + +@Tags({ "csv", "json", "transform", "registry", "schema" }) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Transforms CSV content of the Flow File to JSON using the schema provided by the Schema Registry Service.") +public final class TransformCSVToJson extends AbstractCSVTransformer { + + private static final List<PropertyDescriptor> DESCRIPTORS; + + static { + List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.addAll(BASE_CSV_DESCRIPTORS); + descriptors.add(QUOTE); + DESCRIPTORS = Collections.unmodifiableList(descriptors); + } + + private volatile char quoteChar; + + /** + * + */ + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + super.onScheduled(context); + this.quoteChar = context.getProperty(QUOTE).getValue().charAt(0); + } + + /** + * + */ + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + /** + * + */ + @Override + protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { + GenericRecord avroRecord = CSVUtils.read(in, this.delimiter, schema, this.quoteChar); + JsonUtils.write(avroRecord, out); + return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "application/json"); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java new file mode 100644 index 0000000..f54a4b5 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.attributes.CoreAttributes; + +@Tags({ "registry", "schema", "avro", "json", "transform" }) +@CapabilityDescription("Transforms JSON content of the Flow File to Avro using the schema provided by the Schema Registry Service.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public final class TransformJsonToAvro extends AbstractContentTransformer { + + /** + * + */ + @Override + protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { + GenericRecord avroRecord = JsonUtils.read(in, schema); + AvroUtils.write(avroRecord, out); + return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "binary/avro"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java new file mode 100644 index 0000000..c026570 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.attributes.CoreAttributes; + +@Tags({ "registry", "schema", "csv", "json", "transform" }) +@CapabilityDescription("Transforms JSON content of the Flow File to CSV using the schema provided by the Schema Registry Service.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public final class TransformJsonToCSV extends AbstractCSVTransformer { + + /** + * + */ + @Override + protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { + GenericRecord avroRecord = JsonUtils.read(in, schema); + CSVUtils.write(avroRecord, this.delimiter, out); + return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "text/csv"); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..0bb067e --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.schemaregistry.processors.TransformJsonToAvro +org.apache.nifi.schemaregistry.processors.TransformAvroToJson +org.apache.nifi.schemaregistry.processors.TransformCSVToAvro +org.apache.nifi.schemaregistry.processors.TransformCSVToJson +org.apache.nifi.schemaregistry.processors.TransformAvroToCSV +org.apache.nifi.schemaregistry.processors.TransformJsonToCSV +org.apache.nifi.schemaregistry.processors.ExtractAvroFields \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java new file mode 100644 index 0000000..dabbc17 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; +import java.nio.file.Files; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.junit.Test; +import org.junit.runner.RunWith; + +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + +@RunWith(JUnitParamsRunner.class) +public class TransformersTest { + + private final ClassLoader classLoader = getClass().getClassLoader(); + + @Test + public void validateCSVtoAvroPair() throws Exception { + String data = "John Dow|13|blue"; + String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " + + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, " + + "{\"name\": \"favorite_number\", \"type\": \"int\"}, " + + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}"; + + Schema schema = new Schema.Parser().parse(fooSchemaText); + + // CSV -> AVRO -> CSV + ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); + GenericRecord record = CSVUtils.read(in, '|', schema, '\"'); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + AvroUtils.write(record, out); + byte[] avro = out.toByteArray(); + + in = new ByteArrayInputStream(avro); + record = AvroUtils.read(in, schema); + out = new ByteArrayOutputStream(); + CSVUtils.write(record, '|', out); + byte[] csv = out.toByteArray(); + assertEquals(data, new String(csv, StandardCharsets.UTF_8)); + } + + @Test + public void validateCSVtoJsonPair() throws Exception { + String data = "John Dow|13|blue"; + String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " + + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, " + + "{\"name\": \"favorite_number\", \"type\": \"int\"}, " + + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}"; + + Schema schema = new Schema.Parser().parse(fooSchemaText); + + // CSV -> JSON -> CSV + ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); + GenericRecord record = CSVUtils.read(in, '|', schema, '\"'); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + JsonUtils.write(record, out); + byte[] json = out.toByteArray(); + + assertEquals("{\"name\":\"John Dow\",\"favorite_number\":13,\"favorite_color\":\"blue\"}", new String(json, StandardCharsets.UTF_8)); + + in = new ByteArrayInputStream(json); + record = JsonUtils.read(in, schema); + out = new ByteArrayOutputStream(); + CSVUtils.write(record, '|', out); + byte[] csv = out.toByteArray(); + assertEquals(data, new String(csv, StandardCharsets.UTF_8)); + } + + @Test + public void validateJsonToAvroPair() throws Exception { + String data = "{\"name\":\"John Dow\",\"favorite_number\":13,\"favorite_color\":\"blue\"}"; + String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " + + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, " + + "{\"name\": \"favorite_number\", \"type\": \"int\"}, " + + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}"; + + Schema schema = new Schema.Parser().parse(fooSchemaText); + + // JSON -> AVRO -> JSON + ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); + GenericRecord record = JsonUtils.read(in, schema); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + AvroUtils.write(record, out); + byte[] avro = out.toByteArray(); + + in = new ByteArrayInputStream(avro); + record = AvroUtils.read(in, schema); + out = new ByteArrayOutputStream(); + JsonUtils.write(record, out); + byte[] csv = out.toByteArray(); + assertEquals(data, new String(csv, StandardCharsets.UTF_8)); + } + + @Test + @Parameters({"input_csv/union_null_last_field_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_null_last_field_with_default.txt", + "input_csv/union_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_with_default.txt", + "input_csv/union_null_middle_field_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_null_middle_field_with_default.txt", + "input_csv/primitive_types.txt,input_avro/primitive_types_no_defaults.txt,expected_ouput_csv/primitive_types.txt", + "input_csv/primitive_types_with_matching_default.txt,input_avro/primitive_types_with_matching_default.txt,expected_ouput_csv/primitive_types_with_matching_default.txt", + "input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_valid_scale_with_no_default.txt,expected_ouput_csv/decimal_logicalType.txt", + "input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_invalid_scale.txt", + "input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_valid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt", + "input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_with_default.txt"}) + public void testCSVRoundtrip(final String inputCSVFileName, final String inputAvroSchema, final String expectedOuput) throws Exception { + + final String data = getResourceAsString(inputCSVFileName); + final String schemaText = getResourceAsString(inputAvroSchema); + final String result = getResourceAsString(expectedOuput); + + Schema schema = new Schema.Parser().parse(schemaText); + + + // CSV -> AVRO -> CSV + + ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); + GenericRecord record = CSVUtils.read(in, '|', schema, '\"'); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + AvroUtils.write(record, out); + byte[] avro = out.toByteArray(); + + in = new ByteArrayInputStream(avro); + record = AvroUtils.read(in, schema); + out = new ByteArrayOutputStream(); + CSVUtils.write(record, '|', out); + byte[] csv = out.toByteArray(); + assertEquals(result, new String(csv, StandardCharsets.UTF_8)); + + } + + @Test + @Parameters({"input_csv/union_with_missing_value.txt,input_avro/union_and_mismatch_defaults.txt", + "input_csv/primitive_types_with_matching_default.txt,input_avro/primitive_types_with_mismatch_default.txt"}) + public void testCSVMismatchDefaults(final String inputCSVFileName, final String inputAvroSchema) { + + try { + final String data = getResourceAsString(inputCSVFileName); + + final String schemaText = getResourceAsString(inputAvroSchema); + + Schema schema = new Schema.Parser().parse(schemaText); + + ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); + CSVUtils.read(in, '|', schema, '\"'); + }catch (IOException ioe){ + assertTrue(false); + }catch(IllegalArgumentException iae){ + assertTrue(true); + } + + } + + /** + * Simple wrapper around getting the test resource file that is used by the above test cases + * + * @param fileName - the filename of the file to read + * @return A string that contains the body of the file. + * @throws IOException - if an error occurs reading the file. + */ + private String getResourceAsString(String fileName) throws IOException { + return new String(Files.readAllBytes(FileSystems.getDefault().getPath(classLoader.getResource(fileName).getPath()))); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt new file mode 100644 index 0000000..1a53f85 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt @@ -0,0 +1 @@ +"fake_transactionid"|11234567.890 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt new file mode 100644 index 0000000..9506ad4 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt @@ -0,0 +1 @@ +"fake_transactionid"|11234567.89 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt new file mode 100644 index 0000000..2309e71 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt @@ -0,0 +1 @@ +"fake_transactionid"|0.000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt new file mode 100644 index 0000000..3a9689c --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt @@ -0,0 +1 @@ +"fake_transactionid"|0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt new file mode 100644 index 0000000..77f353f --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt @@ -0,0 +1 @@ +"this is a simple string."|10|21474836470|1.7976931348623157E308|true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt new file mode 100644 index 0000000..095f81e --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt @@ -0,0 +1 @@ +"default_string"|1234|21474836470|1.7976931348623157E308|true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt new file mode 100644 index 0000000..83cbf75 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt @@ -0,0 +1 @@ +andrew|13| \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt new file mode 100644 index 0000000..1b03c97 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt @@ -0,0 +1 @@ +andrew|21474|blue \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt new file mode 100644 index 0000000..9c7abb5 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt @@ -0,0 +1 @@ +andrew|13|blue \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt new file mode 100644 index 0000000..54ba8b1 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt @@ -0,0 +1,16 @@ +{ + "name": "trx_table", + "type": "record", + "fields": [ + { + "name": "transactionid", + "type": ["string", "null"] + }, { + "name": "amount", + "type": "bytes", + "logicalType": "decimal", + "precision": 10, + "scale": 13, + "default": 0.0 + }] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt new file mode 100644 index 0000000..8385fb1 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt @@ -0,0 +1,16 @@ +{ + "name": "trx_table", + "type": "record", + "fields": [ + { + "name": "transactionid", + "type": ["string", "null"] + }, { + "name": "amount", + "type": "bytes", + "logicalType": "decimal", + "precision": 10, + "scale": 3, + "default": 0.0 + }] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt new file mode 100644 index 0000000..9878590 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt @@ -0,0 +1,15 @@ +{ + "name": "trx_table", + "type": "record", + "fields": [ + { + "name": "transactionid", + "type": ["string", "null"] + }, { + "name": "amount", + "type": "bytes", + "logicalType": "decimal", + "precision": 10, + "scale": 3 + }] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt new file mode 100644 index 0000000..934a53c --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt @@ -0,0 +1,11 @@ +{ + "type":"record", + "name":"basic_primitive_type_check", + "fields":[ + {"name":"string","type":"string"}, + {"name":"integer","type":"int"}, + {"name":"long","type":"long"}, + {"name":"double","type":"double"}, + {"name":"boolean","type":"boolean"} + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt new file mode 100644 index 0000000..abc80ca --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt @@ -0,0 +1,11 @@ +{ + "type":"record", + "name":"basic_primitive_type_check", + "fields":[ + {"name":"string","type":["null","string"],"default":null}, + {"name":"integer","type":["null","int"],"default":null}, + {"name":"long","type":["null","long"],"default":null}, + {"name":"double","type":["null","double"],"default":null}, + {"name":"boolean","type":["null","boolean"],"default":null} + ] + } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt new file mode 100644 index 0000000..b3ea951 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt @@ -0,0 +1,11 @@ +{ + "type":"record", + "name":"basic_primitive_type_check", + "fields":[ + {"name":"string","type":"string","default":"default_string"}, + {"name":"integer","type":"int","default":1234}, + {"name":"long","type":"long","default":21474836470}, + {"name":"double","type":"double","default":1.7976931348623157E308}, + {"name":"boolean","type":"boolean","default":true} + ] + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt new file mode 100644 index 0000000..e8f0e28 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt @@ -0,0 +1,11 @@ +{ + "type":"record", + "name":"basic_primitive_type_check", + "fields":[ + {"name":"string","type":"string","default":1234}, + {"name":"integer","type":"int","default":"mismatch_int"}, + {"name":"long","type":"long","default":"mismatch_long"}, + {"name":"double","type":"double","default":"mismatch_double"}, + {"name":"boolean","type":"boolean","default":"mismatch_boolean"} + ] + } http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt new file mode 100644 index 0000000..442a3a4 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt @@ -0,0 +1,18 @@ +{ + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [{ + "name": "name", + "type": "string", + "default": "default_name" + }, { + "name": "favorite_number", + "type": "int", + "default": 21474 + }, { + "name": "favorite_color", + "type": ["null", "string"], + "default": null + }] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt new file mode 100644 index 0000000..5222074 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt @@ -0,0 +1,18 @@ +{ + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [{ + "name": "name", + "type": "string", + "default": "default_name" + }, { + "name": "favorite_number", + "type": "int", + "default": "mismatched_int_default" + }, { + "name": "favorite_color", + "type": ["null", "string"], + "default": null + }] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt new file mode 100644 index 0000000..1a53f85 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt @@ -0,0 +1 @@ +"fake_transactionid"|11234567.890 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt new file mode 100644 index 0000000..1ee2a9b --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt @@ -0,0 +1 @@ +"fake_transactionid"| \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt new file mode 100644 index 0000000..77f353f --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt @@ -0,0 +1 @@ +"this is a simple string."|10|21474836470|1.7976931348623157E308|true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt new file mode 100644 index 0000000..b60c01b --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt @@ -0,0 +1 @@ +"default_string"||21474836470||true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt new file mode 100644 index 0000000..83cbf75 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt @@ -0,0 +1 @@ +andrew|13| \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt new file mode 100644 index 0000000..5a706ac --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt @@ -0,0 +1 @@ +andrew||blue \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt new file mode 100644 index 0000000..9c7abb5 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt @@ -0,0 +1 @@ +andrew|13|blue \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt new file mode 100644 index 0000000..5a706ac --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt @@ -0,0 +1 @@ +andrew||blue \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml new file mode 100644 index 0000000..d99dc64 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Copyright 2016 Hortoworks, Inc. All rights reserved. Hortonworks, Inc. + licenses this file to you under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. You may + obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + See the associated NOTICE file for additional information regarding copyright + ownership. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-registry-bundle</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-registry-service</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java new file mode 100644 index 0000000..fd5d0c5 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import java.util.Properties; + +import org.apache.nifi.controller.ControllerService; + +/** + * Represents {@link ControllerService} strategy to expose internal and/or + * integrate with external Schema Registry + */ +public interface SchemaRegistry extends ControllerService, AutoCloseable { + + public static final String SCHEMA_NAME_ATTR = "schema.name"; + + + /** + * Retrieves and returns the textual representation of the schema based on + * the provided name of the schema available in Schema Registry. Will throw + * an runtime exception if schema can not be found. + */ + String retrieveSchemaText(String schemaName); + + /** + * Retrieves and returns the textual representation of the schema based on + * the provided name of the schema available in Schema Registry and optional + * additional attributes. Will throw an runtime exception if schema can not + * be found. + */ + String retrieveSchemaText(String schemaName, Properties attributes); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java new file mode 100644 index 0000000..aaedea2 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; + +@Tags({ "schema", "registry", "avro", "json", "csv" }) +@CapabilityDescription("Provides a service for registering and accessing schemas. You can register schema " + + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " + + "representation of the actual schema.") +public class SimpleKeyValueSchemaRegistry extends AbstractControllerService implements SchemaRegistry { + + private static final List<PropertyDescriptor> propertyDescriptors; + + static { + propertyDescriptors = Collections.emptyList(); + } + + private final Map<String, String> schemaNameToSchemaMap; + + public SimpleKeyValueSchemaRegistry() { + this.schemaNameToSchemaMap = new HashMap<>(); + } + + @OnEnabled + public void enable(ConfigurationContext configuratiponContext) throws InitializationException { + this.schemaNameToSchemaMap.putAll(configuratiponContext.getProperties().entrySet().stream() + .filter(propEntry -> propEntry.getKey().isDynamic()) + .collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue()))); + } + + /** + * + */ + @Override + public String retrieveSchemaText(String schemaName) { + if (!this.schemaNameToSchemaMap.containsKey(schemaName)) { + throw new IllegalArgumentException("Failed to find schema; Name: '" + schemaName + "."); + } else { + return this.schemaNameToSchemaMap.get(schemaName); + } + } + + @Override + public String retrieveSchemaText(String schemaName, Properties attributes) { + throw new UnsupportedOperationException("This version of schema registry does not " + + "support this operation, since schemas are only identofied by name."); + } + + @Override + @OnDisabled + public void close() throws Exception { + this.schemaNameToSchemaMap.clear(); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder().required(false).name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true).expressionLanguageSupported(true) + .build(); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..1775b76 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.schemaregistry.services.SimpleKeyValueSchemaRegistry \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java new file mode 100644 index 0000000..29179ab --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.schemaregistry.services.SimpleKeyValueSchemaRegistry; +import org.junit.Test; + +public class SimpleKeyValueSchemaRegistryTest { + + @Test + public void validateSchemaRegistrationFromrDynamicProperties() throws Exception { + String schemaName = "fooSchema"; + ConfigurationContext configContext = mock(ConfigurationContext.class); + Map<PropertyDescriptor, String> properties = new HashMap<>(); + PropertyDescriptor fooSchema = new PropertyDescriptor.Builder() + .name(schemaName) + .dynamic(true) + .build(); + String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " + + "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, " + + "{\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]}, " + + "{\"name\": \"foo\", \"type\": [\"int\", \"null\"]}, " + + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}"; + PropertyDescriptor barSchema = new PropertyDescriptor.Builder() + .name("barSchema") + .dynamic(false) + .build(); + properties.put(fooSchema, fooSchemaText); + properties.put(barSchema, ""); + when(configContext.getProperties()).thenReturn(properties); + SchemaRegistry delegate = new SimpleKeyValueSchemaRegistry(); + ((SimpleKeyValueSchemaRegistry)delegate).enable(configContext); + + String locatedSchemaText = delegate.retrieveSchemaText(schemaName); + assertEquals(fooSchemaText, locatedSchemaText); + try { + locatedSchemaText = delegate.retrieveSchemaText("barSchema"); + fail(); + } catch (Exception e) { + // ignore + } + delegate.close(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/nifi-registry-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/pom.xml new file mode 100644 index 0000000..fa6d30f --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-registry-bundle</artifactId> + <packaging>pom</packaging> + <description>A bundle of processors that rely on external service to obtain schema.</description> + + <properties> + <commons-lang3.version>3.0</commons-lang3.version> + </properties> + + <modules> + <module>nifi-registry-processors</module> + <module>nifi-registry-service</module> + <module>nifi-registry-nar</module> + </modules> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-registry-processors</artifactId> + <version>1.2.0-SNAPSHOT</version> + </dependency> + </dependencies> + </dependencyManagement> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index f0164af..02785d5 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -74,6 +74,7 @@ <module>nifi-websocket-bundle</module> <module>nifi-tcp-bundle</module> <module>nifi-gcp-bundle</module> + <module>nifi-registry-bundle</module> </modules> <dependencyManagement> http://git-wip-us.apache.org/repos/asf/nifi/blob/6a1854c9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c968c6f..a07e7ff 100644 --- a/pom.xml +++ b/pom.xml @@ -1146,6 +1146,12 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-registry-nar</artifactId> + <version>1.2.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-mqtt-nar</artifactId> <version>1.2.0-SNAPSHOT</version> <type>nar</type>
