http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java deleted file mode 100644 index f54a4b5..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.Map; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.flowfile.attributes.CoreAttributes; - -@Tags({ "registry", "schema", "avro", "json", "transform" }) -@CapabilityDescription("Transforms JSON content of the Flow File to Avro using the schema provided by the Schema Registry Service.") -@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -public final class TransformJsonToAvro extends AbstractContentTransformer { - - /** - * - */ - @Override - protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { - GenericRecord avroRecord = JsonUtils.read(in, schema); - AvroUtils.write(avroRecord, out); - return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "binary/avro"); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java deleted file mode 100644 index c026570..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.Map; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.flowfile.attributes.CoreAttributes; - -@Tags({ "registry", "schema", "csv", "json", "transform" }) -@CapabilityDescription("Transforms JSON content of the Flow File to CSV using the schema provided by the Schema Registry Service.") -@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -public final class TransformJsonToCSV extends AbstractCSVTransformer { - - /** - * - */ - @Override - protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { - GenericRecord avroRecord = JsonUtils.read(in, schema); - CSVUtils.write(avroRecord, this.delimiter, out); - return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "text/csv"); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor deleted file mode 100644 index 0bb067e..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ /dev/null @@ -1,21 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.nifi.schemaregistry.processors.TransformJsonToAvro -org.apache.nifi.schemaregistry.processors.TransformAvroToJson -org.apache.nifi.schemaregistry.processors.TransformCSVToAvro -org.apache.nifi.schemaregistry.processors.TransformCSVToJson -org.apache.nifi.schemaregistry.processors.TransformAvroToCSV -org.apache.nifi.schemaregistry.processors.TransformJsonToCSV -org.apache.nifi.schemaregistry.processors.ExtractAvroFields \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java deleted file mode 100644 index 058af62..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.processors; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.charset.StandardCharsets; -import java.nio.file.FileSystems; -import java.nio.file.Files; -import java.text.DecimalFormat; -import java.text.NumberFormat; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.junit.Test; -import org.junit.runner.RunWith; - -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; -import static org.junit.Assume.assumeFalse; - -@RunWith(JUnitParamsRunner.class) -public class TransformersTest { - - private final ClassLoader classLoader = getClass().getClassLoader(); - - @Test - public void validateCSVtoAvroPair() throws Exception { - String data = "John Dow|13|blue"; - String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " - + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, " - + "{\"name\": \"favorite_number\", \"type\": \"int\"}, " - + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}"; - - Schema schema = new Schema.Parser().parse(fooSchemaText); - - // CSV -> AVRO -> CSV - ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); - GenericRecord record = CSVUtils.read(in, '|', schema, '\"'); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - AvroUtils.write(record, out); - byte[] avro = out.toByteArray(); - - in = new ByteArrayInputStream(avro); - record = AvroUtils.read(in, schema); - out = new ByteArrayOutputStream(); - CSVUtils.write(record, '|', out); - byte[] csv = out.toByteArray(); - assertEquals(data, new String(csv, StandardCharsets.UTF_8)); - } - - @Test - public void validateCSVtoJsonPair() throws Exception { - String data = "John Dow|13|blue"; - String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " - + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, " - + "{\"name\": \"favorite_number\", \"type\": \"int\"}, " - + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}"; - - Schema schema = new Schema.Parser().parse(fooSchemaText); - - // CSV -> JSON -> CSV - ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); - GenericRecord record = CSVUtils.read(in, '|', schema, '\"'); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - JsonUtils.write(record, out); - byte[] json = out.toByteArray(); - - assertEquals("{\"name\":\"John Dow\",\"favorite_number\":13,\"favorite_color\":\"blue\"}", new String(json, StandardCharsets.UTF_8)); - - in = new ByteArrayInputStream(json); - record = JsonUtils.read(in, schema); - out = new ByteArrayOutputStream(); - CSVUtils.write(record, '|', out); - byte[] csv = out.toByteArray(); - assertEquals(data, new String(csv, StandardCharsets.UTF_8)); - } - - @Test - public void validateJsonToAvroPair() throws Exception { - String data = "{\"name\":\"John Dow\",\"favorite_number\":13,\"favorite_color\":\"blue\"}"; - String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " - + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, " - + "{\"name\": \"favorite_number\", \"type\": \"int\"}, " - + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}"; - - Schema schema = new Schema.Parser().parse(fooSchemaText); - - // JSON -> AVRO -> JSON - ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); - GenericRecord record = JsonUtils.read(in, schema); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - AvroUtils.write(record, out); - byte[] avro = out.toByteArray(); - - in = new ByteArrayInputStream(avro); - record = AvroUtils.read(in, schema); - out = new ByteArrayOutputStream(); - JsonUtils.write(record, out); - byte[] csv = out.toByteArray(); - assertEquals(data, new String(csv, StandardCharsets.UTF_8)); - } - - @Test - @Parameters({"input_csv/union_null_last_field_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_null_last_field_with_default.txt", - "input_csv/union_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_with_default.txt", - "input_csv/union_null_middle_field_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_null_middle_field_with_default.txt", - "input_csv/primitive_types.txt,input_avro/primitive_types_no_defaults.txt,expected_ouput_csv/primitive_types.txt", - "input_csv/primitive_types_with_matching_default.txt,input_avro/primitive_types_with_matching_default.txt,expected_ouput_csv/primitive_types_with_matching_default.txt", - "input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_with_default.txt"}) - public void testCSVRoundtrip(final String inputCSVFileName, final String inputAvroSchema, final String expectedOuput) throws Exception { - assumeFalse(isWindowsEnvironment()); - final String data = getResourceAsString(inputCSVFileName); - final String schemaText = getResourceAsString(inputAvroSchema); - final String result = getResourceAsString(expectedOuput); - csvRoundTrip(data, schemaText, result); - } - - private boolean isWindowsEnvironment() { - return System.getProperty("os.name").toLowerCase().startsWith("windows"); - } - - @Test - @Parameters({"input_csv/union_with_missing_value.txt,input_avro/union_and_mismatch_defaults.txt", - "input_csv/primitive_types_with_matching_default.txt,input_avro/primitive_types_with_mismatch_default.txt"}) - public void testCSVMismatchDefaults(final String inputCSVFileName, final String inputAvroSchema) { - assumeFalse(isWindowsEnvironment()); - try { - final String data = getResourceAsString(inputCSVFileName); - final String schemaText = getResourceAsString(inputAvroSchema); - Schema schema = new Schema.Parser().parse(schemaText); - - ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); - CSVUtils.read(in, '|', schema, '\"'); - }catch (IOException ioe){ - assertTrue(false); - }catch(IllegalArgumentException iae){ - assertTrue(true); - } - } - - @Test - public void testCSVRoundTrip() throws IOException { - assumeFalse(isWindowsEnvironment()); - NumberFormat numberFormat = DecimalFormat.getInstance(); - numberFormat.setGroupingUsed(false); - ((DecimalFormat) numberFormat).setParseBigDecimal(true); - - //"input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_invalid_scale.txt", - String decimalLogicalType = "\"fake_transactionid\"|" + numberFormat.format(new BigDecimal(11234567.89)); - String data = getResourceAsString("input_csv/decimal_logicalType.txt"); - String schemaText = getResourceAsString("input_avro/decimal_logicalType_invalid_scale_with_default.txt"); - csvRoundTrip(data, schemaText, decimalLogicalType); - - // needs to be set now because scale < precision - numberFormat.setMaximumIntegerDigits(10); - numberFormat.setMaximumFractionDigits(3); - numberFormat.setMinimumFractionDigits(3); - - //"input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_valid_scale_with_no_default.txt,expected_ouput_csv/decimal_logicalType.txt", - decimalLogicalType = "\"fake_transactionid\"|" + numberFormat.format(new BigDecimal(11234567.890)); - data = getResourceAsString("input_csv/decimal_logicalType.txt"); - schemaText = getResourceAsString("input_avro/decimal_logicalType_valid_scale_with_no_default.txt"); - - //"input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_valid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt", - decimalLogicalType = "\"fake_transactionid\"|" + numberFormat.format(new BigDecimal(0.000)); - data = getResourceAsString("input_csv/decimal_logicalType_missing_value.txt"); - schemaText = getResourceAsString("input_avro/decimal_logicalType_valid_scale_with_default.txt"); - csvRoundTrip(data, schemaText, decimalLogicalType); - } - - private void csvRoundTrip(final String data, final String schemaText, final String result) { - Schema schema = new Schema.Parser().parse(schemaText); - - // CSV -> AVRO -> CSV - ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); - GenericRecord record = CSVUtils.read(in, '|', schema, '\"'); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - AvroUtils.write(record, out); - byte[] avro = out.toByteArray(); - - in = new ByteArrayInputStream(avro); - record = AvroUtils.read(in, schema); - out = new ByteArrayOutputStream(); - CSVUtils.write(record, '|', out); - byte[] csv = out.toByteArray(); - assertEquals(result, new String(csv, StandardCharsets.UTF_8)); - } - - /** - * Simple wrapper around getting the test resource file that is used by the above test cases - * - * @param fileName - the filename of the file to read - * @return A string that contains the body of the file. - * @throws IOException - if an error occurs reading the file. - */ - private String getResourceAsString(String fileName) throws IOException { - return new String(Files.readAllBytes(FileSystems.getDefault().getPath(classLoader.getResource(fileName).getPath()))); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt deleted file mode 100644 index 1a53f85..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt +++ /dev/null @@ -1 +0,0 @@ -"fake_transactionid"|11234567.890 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt deleted file mode 100644 index 9506ad4..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt +++ /dev/null @@ -1 +0,0 @@ -"fake_transactionid"|11234567.89 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt deleted file mode 100644 index 2309e71..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt +++ /dev/null @@ -1 +0,0 @@ -"fake_transactionid"|0.000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt deleted file mode 100644 index 3a9689c..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt +++ /dev/null @@ -1 +0,0 @@ -"fake_transactionid"|0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt deleted file mode 100644 index 77f353f..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt +++ /dev/null @@ -1 +0,0 @@ -"this is a simple string."|10|21474836470|1.7976931348623157E308|true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt deleted file mode 100644 index 095f81e..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt +++ /dev/null @@ -1 +0,0 @@ -"default_string"|1234|21474836470|1.7976931348623157E308|true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt deleted file mode 100644 index 83cbf75..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt +++ /dev/null @@ -1 +0,0 @@ -andrew|13| \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt deleted file mode 100644 index 1b03c97..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt +++ /dev/null @@ -1 +0,0 @@ -andrew|21474|blue \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt deleted file mode 100644 index 9c7abb5..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt +++ /dev/null @@ -1 +0,0 @@ -andrew|13|blue \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt deleted file mode 100644 index 54ba8b1..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt +++ /dev/null @@ -1,16 +0,0 @@ -{ - "name": "trx_table", - "type": "record", - "fields": [ - { - "name": "transactionid", - "type": ["string", "null"] - }, { - "name": "amount", - "type": "bytes", - "logicalType": "decimal", - "precision": 10, - "scale": 13, - "default": 0.0 - }] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt deleted file mode 100644 index 8385fb1..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt +++ /dev/null @@ -1,16 +0,0 @@ -{ - "name": "trx_table", - "type": "record", - "fields": [ - { - "name": "transactionid", - "type": ["string", "null"] - }, { - "name": "amount", - "type": "bytes", - "logicalType": "decimal", - "precision": 10, - "scale": 3, - "default": 0.0 - }] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt deleted file mode 100644 index 9878590..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt +++ /dev/null @@ -1,15 +0,0 @@ -{ - "name": "trx_table", - "type": "record", - "fields": [ - { - "name": "transactionid", - "type": ["string", "null"] - }, { - "name": "amount", - "type": "bytes", - "logicalType": "decimal", - "precision": 10, - "scale": 3 - }] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt deleted file mode 100644 index 934a53c..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt +++ /dev/null @@ -1,11 +0,0 @@ -{ - "type":"record", - "name":"basic_primitive_type_check", - "fields":[ - {"name":"string","type":"string"}, - {"name":"integer","type":"int"}, - {"name":"long","type":"long"}, - {"name":"double","type":"double"}, - {"name":"boolean","type":"boolean"} - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt deleted file mode 100644 index abc80ca..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt +++ /dev/null @@ -1,11 +0,0 @@ -{ - "type":"record", - "name":"basic_primitive_type_check", - "fields":[ - {"name":"string","type":["null","string"],"default":null}, - {"name":"integer","type":["null","int"],"default":null}, - {"name":"long","type":["null","long"],"default":null}, - {"name":"double","type":["null","double"],"default":null}, - {"name":"boolean","type":["null","boolean"],"default":null} - ] - } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt deleted file mode 100644 index b3ea951..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt +++ /dev/null @@ -1,11 +0,0 @@ -{ - "type":"record", - "name":"basic_primitive_type_check", - "fields":[ - {"name":"string","type":"string","default":"default_string"}, - {"name":"integer","type":"int","default":1234}, - {"name":"long","type":"long","default":21474836470}, - {"name":"double","type":"double","default":1.7976931348623157E308}, - {"name":"boolean","type":"boolean","default":true} - ] - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt deleted file mode 100644 index e8f0e28..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt +++ /dev/null @@ -1,11 +0,0 @@ -{ - "type":"record", - "name":"basic_primitive_type_check", - "fields":[ - {"name":"string","type":"string","default":1234}, - {"name":"integer","type":"int","default":"mismatch_int"}, - {"name":"long","type":"long","default":"mismatch_long"}, - {"name":"double","type":"double","default":"mismatch_double"}, - {"name":"boolean","type":"boolean","default":"mismatch_boolean"} - ] - } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt deleted file mode 100644 index 442a3a4..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt +++ /dev/null @@ -1,18 +0,0 @@ -{ - "namespace": "example.avro", - "type": "record", - "name": "User", - "fields": [{ - "name": "name", - "type": "string", - "default": "default_name" - }, { - "name": "favorite_number", - "type": "int", - "default": 21474 - }, { - "name": "favorite_color", - "type": ["null", "string"], - "default": null - }] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt deleted file mode 100644 index 5222074..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt +++ /dev/null @@ -1,18 +0,0 @@ -{ - "namespace": "example.avro", - "type": "record", - "name": "User", - "fields": [{ - "name": "name", - "type": "string", - "default": "default_name" - }, { - "name": "favorite_number", - "type": "int", - "default": "mismatched_int_default" - }, { - "name": "favorite_color", - "type": ["null", "string"], - "default": null - }] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt deleted file mode 100644 index 1a53f85..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt +++ /dev/null @@ -1 +0,0 @@ -"fake_transactionid"|11234567.890 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt deleted file mode 100644 index 1ee2a9b..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt +++ /dev/null @@ -1 +0,0 @@ -"fake_transactionid"| \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt deleted file mode 100644 index 77f353f..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt +++ /dev/null @@ -1 +0,0 @@ -"this is a simple string."|10|21474836470|1.7976931348623157E308|true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt deleted file mode 100644 index b60c01b..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt +++ /dev/null @@ -1 +0,0 @@ -"default_string"||21474836470||true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt deleted file mode 100644 index 83cbf75..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt +++ /dev/null @@ -1 +0,0 @@ -andrew|13| \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt deleted file mode 100644 index 5a706ac..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt +++ /dev/null @@ -1 +0,0 @@ -andrew||blue \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt deleted file mode 100644 index 9c7abb5..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt +++ /dev/null @@ -1 +0,0 @@ -andrew|13|blue \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt deleted file mode 100644 index 5a706ac..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt +++ /dev/null @@ -1 +0,0 @@ -andrew||blue \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml index d99dc64..3cfae30 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml @@ -23,6 +23,18 @@ <packaging>jar</packaging> <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-registry-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java new file mode 100644 index 0000000..13b1d5d --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +@Tags({"schema", "registry", "avro", "json", "csv"}) +@CapabilityDescription("Provides a service for registering and accessing schemas. You can register a schema " + + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " + + "representation of the actual schema following the syntax and semantics of Avro's Schema format.") +public class AvroSchemaRegistry extends AbstractControllerService implements SchemaRegistry { + + private final Map<String, String> schemaNameToSchemaMap; + + private static final String LOGICAL_TYPE_DATE = "date"; + private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis"; + private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros"; + private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis"; + private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros"; + + + public AvroSchemaRegistry() { + this.schemaNameToSchemaMap = new HashMap<>(); + } + + @OnEnabled + public void enable(ConfigurationContext configuratiponContext) throws InitializationException { + this.schemaNameToSchemaMap.putAll(configuratiponContext.getProperties().entrySet().stream() + .filter(propEntry -> propEntry.getKey().isDynamic()) + .collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue()))); + } + + @Override + public String retrieveSchemaText(String schemaName) { + if (!this.schemaNameToSchemaMap.containsKey(schemaName)) { + throw new IllegalArgumentException("Failed to find schema; Name: '" + schemaName + "."); + } else { + return this.schemaNameToSchemaMap.get(schemaName); + } + } + + @Override + public String retrieveSchemaText(String schemaName, Map<String, String> attributes) { + throw new UnsupportedOperationException("This version of schema registry does not " + + "support this operation, since schemas are only identofied by name."); + } + + @Override + @OnDisabled + public void close() throws Exception { + this.schemaNameToSchemaMap.clear(); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(new AvroSchemaValidator()) + .dynamic(true) + .expressionLanguageSupported(true) + .build(); + } + + + @Override + public RecordSchema retrieveSchema(String schemaName) { + final String schemaText = this.retrieveSchemaText(schemaName); + final Schema schema = new Schema.Parser().parse(schemaText); + return createRecordSchema(schema); + } + + /** + * Converts an Avro Schema to a RecordSchema + * + * @param avroSchema the Avro Schema to convert + * @return the Corresponding Record Schema + */ + private RecordSchema createRecordSchema(final Schema avroSchema) { + final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size()); + for (final Field field : avroSchema.getFields()) { + final String fieldName = field.name(); + final DataType dataType = determineDataType(field.schema()); + recordFields.add(new RecordField(fieldName, dataType)); + } + + final RecordSchema recordSchema = new SimpleRecordSchema(recordFields); + return recordSchema; + } + + /** + * Returns a DataType for the given Avro Schema + * + * @param avroSchema the Avro Schema to convert + * @return a Data Type that corresponds to the given Avro Schema + */ + private DataType determineDataType(final Schema avroSchema) { + final Type avroType = avroSchema.getType(); + + final LogicalType logicalType = avroSchema.getLogicalType(); + if (logicalType != null) { + final String logicalTypeName = logicalType.getName(); + switch (logicalTypeName) { + case LOGICAL_TYPE_DATE: + return RecordFieldType.DATE.getDataType(); + case LOGICAL_TYPE_TIME_MILLIS: + case LOGICAL_TYPE_TIME_MICROS: + return RecordFieldType.TIME.getDataType(); + case LOGICAL_TYPE_TIMESTAMP_MILLIS: + case LOGICAL_TYPE_TIMESTAMP_MICROS: + return RecordFieldType.TIMESTAMP.getDataType(); + } + } + + switch (avroType) { + case ARRAY: + return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType())); + case BYTES: + case FIXED: + return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); + case BOOLEAN: + return RecordFieldType.BOOLEAN.getDataType(); + case DOUBLE: + return RecordFieldType.DOUBLE.getDataType(); + case ENUM: + case STRING: + return RecordFieldType.STRING.getDataType(); + case FLOAT: + return RecordFieldType.FLOAT.getDataType(); + case INT: + return RecordFieldType.INT.getDataType(); + case LONG: + return RecordFieldType.LONG.getDataType(); + case RECORD: { + final List<Field> avroFields = avroSchema.getFields(); + final List<RecordField> recordFields = new ArrayList<>(avroFields.size()); + + for (final Field field : avroFields) { + final String fieldName = field.name(); + final Schema fieldSchema = field.schema(); + final DataType fieldType = determineDataType(fieldSchema); + recordFields.add(new RecordField(fieldName, fieldType)); + } + + final RecordSchema recordSchema = new SimpleRecordSchema(recordFields); + return RecordFieldType.RECORD.getRecordDataType(recordSchema); + } + case NULL: + case MAP: + return RecordFieldType.RECORD.getDataType(); + case UNION: { + final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream() + .filter(s -> s.getType() != Type.NULL) + .collect(Collectors.toList()); + + if (nonNullSubSchemas.size() == 1) { + return determineDataType(nonNullSubSchemas.get(0)); + } + + final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size()); + for (final Schema subSchema : nonNullSubSchemas) { + final DataType childDataType = determineDataType(subSchema); + possibleChildTypes.add(childDataType); + } + + return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes); + } + } + + return null; + } + + /* + * For this implementation 'attributes' argument is ignored since the underlying storage mechanisms + * is based strictly on key/value pairs. In other implementation additional attributes may play a role (e.g., version id,) + */ + @Override + public RecordSchema retrieveSchema(String schemaName, Map<String, String> attributes) { + return this.retrieveSchema(schemaName); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java new file mode 100644 index 0000000..32b700f --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schemaregistry.services; + +import org.apache.avro.Schema; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; + +public class AvroSchemaValidator implements Validator { + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + if (context.isExpressionLanguagePresent(input)) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(true) + .explanation("Expression Language is present") + .build(); + } + + try { + new Schema.Parser().parse(input); + + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(true) + .explanation("Schema is valid") + .build(); + } catch (final Exception e) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Not a valid Avro Schema: " + e.getMessage()) + .build(); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java deleted file mode 100644 index fd5d0c5..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.services; - -import java.util.Properties; - -import org.apache.nifi.controller.ControllerService; - -/** - * Represents {@link ControllerService} strategy to expose internal and/or - * integrate with external Schema Registry - */ -public interface SchemaRegistry extends ControllerService, AutoCloseable { - - public static final String SCHEMA_NAME_ATTR = "schema.name"; - - - /** - * Retrieves and returns the textual representation of the schema based on - * the provided name of the schema available in Schema Registry. Will throw - * an runtime exception if schema can not be found. - */ - String retrieveSchemaText(String schemaName); - - /** - * Retrieves and returns the textual representation of the schema based on - * the provided name of the schema available in Schema Registry and optional - * additional attributes. Will throw an runtime exception if schema can not - * be found. - */ - String retrieveSchemaText(String schemaName, Properties attributes); -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java deleted file mode 100644 index aaedea2..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.services; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; - -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnDisabled; -import org.apache.nifi.annotation.lifecycle.OnEnabled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.reporting.InitializationException; - -@Tags({ "schema", "registry", "avro", "json", "csv" }) -@CapabilityDescription("Provides a service for registering and accessing schemas. You can register schema " - + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " - + "representation of the actual schema.") -public class SimpleKeyValueSchemaRegistry extends AbstractControllerService implements SchemaRegistry { - - private static final List<PropertyDescriptor> propertyDescriptors; - - static { - propertyDescriptors = Collections.emptyList(); - } - - private final Map<String, String> schemaNameToSchemaMap; - - public SimpleKeyValueSchemaRegistry() { - this.schemaNameToSchemaMap = new HashMap<>(); - } - - @OnEnabled - public void enable(ConfigurationContext configuratiponContext) throws InitializationException { - this.schemaNameToSchemaMap.putAll(configuratiponContext.getProperties().entrySet().stream() - .filter(propEntry -> propEntry.getKey().isDynamic()) - .collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue()))); - } - - /** - * - */ - @Override - public String retrieveSchemaText(String schemaName) { - if (!this.schemaNameToSchemaMap.containsKey(schemaName)) { - throw new IllegalArgumentException("Failed to find schema; Name: '" + schemaName + "."); - } else { - return this.schemaNameToSchemaMap.get(schemaName); - } - } - - @Override - public String retrieveSchemaText(String schemaName, Properties attributes) { - throw new UnsupportedOperationException("This version of schema registry does not " - + "support this operation, since schemas are only identofied by name."); - } - - @Override - @OnDisabled - public void close() throws Exception { - this.schemaNameToSchemaMap.clear(); - } - - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder().required(false).name(propertyDescriptorName) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true).expressionLanguageSupported(true) - .build(); - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return propertyDescriptors; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 1775b76..a000cd7 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -12,4 +12,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.schemaregistry.services.SimpleKeyValueSchemaRegistry \ No newline at end of file +org.apache.nifi.schemaregistry.services.AvroSchemaRegistry \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java deleted file mode 100644 index 29179ab..0000000 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.schemaregistry.services; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import org.apache.nifi.schemaregistry.services.SimpleKeyValueSchemaRegistry; -import org.junit.Test; - -public class SimpleKeyValueSchemaRegistryTest { - - @Test - public void validateSchemaRegistrationFromrDynamicProperties() throws Exception { - String schemaName = "fooSchema"; - ConfigurationContext configContext = mock(ConfigurationContext.class); - Map<PropertyDescriptor, String> properties = new HashMap<>(); - PropertyDescriptor fooSchema = new PropertyDescriptor.Builder() - .name(schemaName) - .dynamic(true) - .build(); - String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " - + "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, " - + "{\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]}, " - + "{\"name\": \"foo\", \"type\": [\"int\", \"null\"]}, " - + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}"; - PropertyDescriptor barSchema = new PropertyDescriptor.Builder() - .name("barSchema") - .dynamic(false) - .build(); - properties.put(fooSchema, fooSchemaText); - properties.put(barSchema, ""); - when(configContext.getProperties()).thenReturn(properties); - SchemaRegistry delegate = new SimpleKeyValueSchemaRegistry(); - ((SimpleKeyValueSchemaRegistry)delegate).enable(configContext); - - String locatedSchemaText = delegate.retrieveSchemaText(schemaName); - assertEquals(fooSchemaText, locatedSchemaText); - try { - locatedSchemaText = delegate.retrieveSchemaText("barSchema"); - fail(); - } catch (Exception e) { - // ignore - } - delegate.close(); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java new file mode 100644 index 0000000..929aab9 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.junit.Test; + +public class TestAvroSchemaRegistry { + + @Test + public void validateSchemaRegistrationFromrDynamicProperties() throws Exception { + String schemaName = "fooSchema"; + ConfigurationContext configContext = mock(ConfigurationContext.class); + Map<PropertyDescriptor, String> properties = new HashMap<>(); + PropertyDescriptor fooSchema = new PropertyDescriptor.Builder() + .name(schemaName) + .dynamic(true) + .build(); + String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " + + "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, " + + "{\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]}, " + + "{\"name\": \"foo\", \"type\": [\"int\", \"null\"]}, " + + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}"; + PropertyDescriptor barSchema = new PropertyDescriptor.Builder() + .name("barSchema") + .dynamic(false) + .build(); + properties.put(fooSchema, fooSchemaText); + properties.put(barSchema, ""); + when(configContext.getProperties()).thenReturn(properties); + SchemaRegistry delegate = new AvroSchemaRegistry(); + ((AvroSchemaRegistry) delegate).enable(configContext); + + String locatedSchemaText = delegate.retrieveSchemaText(schemaName); + assertEquals(fooSchemaText, locatedSchemaText); + try { + locatedSchemaText = delegate.retrieveSchemaText("barSchema"); + fail(); + } catch (Exception e) { + // ignore + } + delegate.close(); + } + + + @Test + public void validateRecordSchemaRetrieval() throws Exception { + String schemaName = "fooSchema"; + ConfigurationContext configContext = mock(ConfigurationContext.class); + Map<PropertyDescriptor, String> properties = new HashMap<>(); + PropertyDescriptor fooSchema = new PropertyDescriptor.Builder() + .name(schemaName) + .dynamic(true) + .build(); + String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " + + "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, " + + "{\"name\": \"favorite_number\", \"type\": \"int\"}, " + + "{\"name\": \"foo\", \"type\": \"boolean\"}, " + + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}"; + PropertyDescriptor barSchema = new PropertyDescriptor.Builder() + .name("barSchema") + .dynamic(false) + .build(); + properties.put(fooSchema, fooSchemaText); + properties.put(barSchema, ""); + when(configContext.getProperties()).thenReturn(properties); + SchemaRegistry delegate = new AvroSchemaRegistry(); + ((AvroSchemaRegistry) delegate).enable(configContext); + + RecordSchema locatedSchema = delegate.retrieveSchema(schemaName); + List<RecordField> recordFields = locatedSchema.getFields(); + assertEquals(4, recordFields.size()); + assertEquals(RecordFieldType.STRING.getDataType(), recordFields.get(0).getDataType()); + assertEquals("name", recordFields.get(0).getFieldName()); + assertEquals(RecordFieldType.INT.getDataType(), recordFields.get(1).getDataType()); + assertEquals("favorite_number", recordFields.get(1).getFieldName()); + assertEquals(RecordFieldType.BOOLEAN.getDataType(), recordFields.get(2).getDataType()); + assertEquals("foo", recordFields.get(2).getFieldName()); + assertEquals(RecordFieldType.STRING.getDataType(), recordFields.get(3).getDataType()); + assertEquals("favorite_color", recordFields.get(3).getFieldName()); + delegate.close(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/pom.xml index fa6d30f..5287a02 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-registry-bundle/pom.xml @@ -21,12 +21,7 @@ <packaging>pom</packaging> <description>A bundle of processors that rely on external service to obtain schema.</description> - <properties> - <commons-lang3.version>3.0</commons-lang3.version> - </properties> - <modules> - <module>nifi-registry-processors</module> <module>nifi-registry-service</module> <module>nifi-registry-nar</module> </modules> http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index e390097..295ae96 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -290,8 +290,8 @@ </dependency> <dependency> <groupId>org.apache.calcite</groupId> - <artifactId>calcite-example-csv</artifactId> - <version>1.11.0</version> + <artifactId>calcite-core</artifactId> + <version>1.12.0</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java index 833a5d6..83a3d4b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java @@ -98,19 +98,22 @@ import org.apache.nifi.util.StopWatch; + "that is selected being routed to the relationship whose name is the property name") public class QueryFlowFile extends AbstractProcessor { static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() - .name("Record Reader") + .name("record-reader") + .displayName("Record Reader") .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") .identifiesControllerService(RowRecordReaderFactory.class) .required(true) .build(); static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder() - .name("Record Writer") + .name("record-writer") + .displayName("Record Writer") .description("Specifies the Controller Service to use for writing results to a FlowFile") .identifiesControllerService(RecordSetWriterFactory.class) .required(true) .build(); static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder() - .name("Include Zero Record FlowFiles") + .name("include-zero-record-flowfiles") + .displayName("Include Zero Record FlowFiles") .description("When running the SQL statement against an incoming FlowFile, if the result has no data, " + "this property specifies whether or not a FlowFile will be sent to the corresponding relationship") .expressionLanguageSupported(false) @@ -119,7 +122,8 @@ public class QueryFlowFile extends AbstractProcessor { .required(true) .build(); static final PropertyDescriptor CACHE_SCHEMA = new PropertyDescriptor.Builder() - .name("Cache Schema") + .name("cache-schema") + .displayName("Cache Schema") .description("Parsing the SQL query and deriving the FlowFile's schema is relatively expensive. If this value is set to true, " + "the Processor will cache these values so that the Processor is much more efficient and much faster. However, if this is done, " + "then the schema that is derived for the first FlowFile processed must apply to all FlowFiles. If all FlowFiles will not have the exact " @@ -391,7 +395,7 @@ public class QueryFlowFile extends AbstractProcessor { final Supplier<CalciteConnection> connectionSupplier = () -> { final Properties properties = new Properties(); - properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.JAVA.name()); + properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL_ANSI.name()); try { final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties); @@ -491,6 +495,15 @@ public class QueryFlowFile extends AbstractProcessor { private static class SqlValidator implements Validator { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + if (context.isExpressionLanguagePresent(input)) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(true) + .explanation("Expression Language Present") + .build(); + } + final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue(); final SqlParser parser = SqlParser.create(substituted); try { http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java index 1a62d14..7daa002 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java @@ -99,10 +99,6 @@ public class FlowFileEnumerator<InternalType> implements Enumerator<Object> { } // Create a new Object array that contains only the desired fields. - if (row.length <= fields.length) { - return row; - } - final Object[] filtered = new Object[fields.length]; for (int i = 0; i < fields.length; i++) { final int indexToKeep = fields[i]; @@ -125,7 +121,7 @@ public class FlowFileEnumerator<InternalType> implements Enumerator<Object> { rawIn = session.read(flowFile); try { - recordParser = recordParserFactory.createRecordReader(rawIn, logger); + recordParser = recordParserFactory.createRecordReader(flowFile, rawIn, logger); } catch (final MalformedRecordException | IOException e) { throw new ProcessException("Failed to reset stream", e); } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java index a23dcfa..27f0c42 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java @@ -136,7 +136,7 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable RecordSchema schema; try (final InputStream in = session.read(flowFile)) { - final RecordReader recordParser = recordParserFactory.createRecordReader(in, logger); + final RecordReader recordParser = recordParserFactory.createRecordReader(flowFile, in, logger); schema = recordParser.getSchema(); } catch (final MalformedRecordException | IOException e) { throw new ProcessException("Failed to determine schema of data records for " + flowFile, e); @@ -189,7 +189,7 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable return typeFactory.createJavaType(String.class); case ARRAY: return typeFactory.createJavaType(Object[].class); - case OBJECT: + case RECORD: return typeFactory.createJavaType(Object.class); } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html index 1cc7923..0dffc0d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html @@ -41,7 +41,8 @@ </p> <p> - The SQL syntax that is supported by this Processor is ANSI SQL and is powered by Apache Calcite. + The SQL syntax that is supported by this Processor is ANSI SQL and is powered by Apache Calcite. Please + note that identifiers are quoted using double-quotes, and column names/labels are case-insensitive. </p> </body> </html> \ No newline at end of file
