twalthr commented on a change in pull request #15808:
URL: https://github.com/apache/flink/pull/15808#discussion_r645487560
##########
File path:
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java
##########
@@ -37,4 +37,40 @@ private RegistryAvroOptions() {}
.noDefaultValue()
.withDescription(
"Subject name to write to the Schema Registry
service, required for sink");
+
+ public static final ConfigOption<String>
SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION =
+ ConfigOptions.key("schema-registry.keystore.location")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Location / File of SSL keystore");
+
+ public static final ConfigOption<String>
SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD =
+ ConfigOptions.key("schema-registry.keystore.password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Password for SSL keystore");
+
+ public static final ConfigOption<String>
SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION =
+ ConfigOptions.key("schema-registry.truststore.location")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Location / File of SSL truststore");
+
+ public static final ConfigOption<String>
SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD =
+ ConfigOptions.key("schema-registry.truststore.password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Password for SSL truststore");
+
+ public static final ConfigOption<String>
SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE =
+ ConfigOptions.key("schema-registry.basic.auth.credentials.source")
Review comment:
dots are for hierarchies not for word separation, rewrite to
`schema-registry.basic-auth.credentials-source`
##########
File path:
flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchemaTest.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.generated.SimpleRecord;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link ConfluentRegistryAvroDeserializationSchema}. */
+public class ConfluentRegistryAvroDeserializationSchemaTest {
Review comment:
Tests should always be well chosen. I have feeling that this test is not
very useful and can be dropped.
##########
File path:
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
##########
@@ -84,10 +86,47 @@ private ConfluentRegistryAvroDeserializationSchema(
*/
public static ConfluentRegistryAvroDeserializationSchema<GenericRecord>
forGeneric(
Schema schema, String url, int identityMapCapacity) {
+ return forGeneric(schema, url, identityMapCapacity, null);
+ }
+
+ /**
+ * Creates {@link ConfluentRegistryAvroDeserializationSchema} that
produces {@link
+ * GenericRecord} using provided reader schema and looks up writer schema
in Confluent Schema
+ * Registry.
+ *
+ * @param schema schema of produced records
+ * @param url url of schema registry to connect
+ * @param registryConfigs map with additional schema registry configs (for
example ssl
+ * properties: 'schema.registry.ssl.keystore.location',
Review comment:
avoid code duplication and simplify `map with additional schema registry
configs (for example for SSL properties).`
##########
File path:
flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroSerializationSchemaTest.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.generated.SimpleRecord;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link ConfluentRegistryAvroSerializationSchema}. */
+public class ConfluentRegistryAvroSerializationSchemaTest {
Review comment:
Remove as well?
##########
File path:
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroOptions.java
##########
@@ -37,4 +37,40 @@ private RegistryAvroOptions() {}
.noDefaultValue()
.withDescription(
"Subject name to write to the Schema Registry
service, required for sink");
+
+ public static final ConfigOption<String>
SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION =
+ ConfigOptions.key("schema-registry.keystore.location")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Location / File of SSL keystore");
+
+ public static final ConfigOption<String>
SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD =
+ ConfigOptions.key("schema-registry.keystore.password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Password for SSL keystore");
+
+ public static final ConfigOption<String>
SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION =
+ ConfigOptions.key("schema-registry.truststore.location")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Location / File of SSL truststore");
+
+ public static final ConfigOption<String>
SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD =
+ ConfigOptions.key("schema-registry.truststore.password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Password for SSL truststore");
+
+ public static final ConfigOption<String>
SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE =
+ ConfigOptions.key("schema-registry.basic.auth.credentials.source")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Basic auth credentials source for schema
registry");
+
+ public static final ConfigOption<String>
SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO =
+ ConfigOptions.key("schema-registry.basic.auth.user.info")
Review comment:
rewrite to `schema-registry.basic-auth.user-info`
##########
File path:
flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactoryTest.java
##########
@@ -127,6 +139,66 @@ public void testMissingSubjectForSink() {
createTableSink(SCHEMA, options);
}
+ @Test
+ public void testDeserializationSchemaWithOptionalProperties() {
+ final AvroRowDataDeserializationSchema expectedDeser =
Review comment:
Reduce code duplication and move shared parts with the test above into a
helper method.
##########
File path:
flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/CachedSchemaCoderProviderTest.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.RestService;
+import org.junit.Test;
+import org.powermock.reflect.Whitebox;
+
+import javax.net.ssl.SSLSocketFactory;
+
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link CachedSchemaCoderProvider}. */
+public class CachedSchemaCoderProviderTest {
+
+ @Test
+ public void testThatSslIsNotInitializedIfSslPropertiesAreNotProvided() {
+ SSLSocketFactory sslSocketFactory = getSslSocketFactory(new
HashMap<>());
+
+ assertTrue(sslSocketFactory == null);
+ }
+
+ @Test
+ public void testThatSslIsInitializedIfSslPropertiesAreProvided() throws
URISyntaxException {
+ String keystoreFile = getAbsolutePath("/test-keystore.jks");
+ String keystorePassword = "123456";
+ Map<String, String> configs = new HashMap<>();
+ configs.put("schema.registry.ssl.keystore.location", keystoreFile);
+ configs.put("schema.registry.ssl.keystore.password", keystorePassword);
+ configs.put("schema.registry.ssl.truststore.location", keystoreFile);
+ configs.put("schema.registry.ssl.truststore.password",
keystorePassword);
+ SSLSocketFactory sslSocketFactory = getSslSocketFactory(configs);
+
+ assertFalse(sslSocketFactory == null);
Review comment:
is it possible to also check what has been set?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]