This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e4aea8c Fix validator logic to differentiate between serializer and
deserializer (#2523)
e4aea8c is described below
commit e4aea8cd04f8052ff91ba50294b25d01d8943d99
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Thu Sep 6 11:15:49 2018 -0700
Fix validator logic to differentiate between serializer and deserializer
(#2523)
* Fix validator logic to differentiate between serializer and deserializer
* Expand to include backend and schema
* Fix buil
* Fix unittest
* Fixed unittest
---
.../pulsar/functions/instance/ContextImpl.java | 4 +-
.../pulsar/functions/instance/InstanceUtils.java | 37 +++++++++----
.../apache/pulsar/functions/sink/PulsarSink.java | 4 +-
.../pulsar/functions/source/PulsarSource.java | 4 +-
.../pulsar/functions/source/TopicSchema.java | 18 +++----
.../pulsar/functions/sink/PulsarSinkTest.java | 2 +-
.../pulsar/functions/source/PulsarSourceTest.java | 2 +-
.../functions/utils/validation/ValidatorImpls.java | 62 ++++++++++++++--------
8 files changed, 80 insertions(+), 53 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index aec52fc..c4099f4 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -254,13 +254,13 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
@SuppressWarnings("unchecked")
@Override
public <O> CompletableFuture<Void> publish(String topicName, O object) {
- return publish(topicName, object, (Schema<O>)
topicSchema.getSchema(topicName, object));
+ return publish(topicName, object, "");
}
@SuppressWarnings("unchecked")
@Override
public <O> CompletableFuture<Void> publish(String topicName, O object,
String schemaOrSerdeClassName) {
- return publish(topicName, object, (Schema<O>)
topicSchema.getSchema(topicName, object, schemaOrSerdeClassName));
+ return publish(topicName, object, (Schema<O>)
topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
}
@SuppressWarnings("unchecked")
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index b4a9bf9..86a9aa2 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -31,27 +31,42 @@ import net.jodah.typetools.TypeResolver;
@UtilityClass
public class InstanceUtils {
- public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader
clsLoader, Class<?> typeArg) {
+ public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader
clsLoader, Class<?> typeArg,
+ boolean deser) {
SerDe<?> serDe = createInstance(serdeClassName, clsLoader,
SerDe.class);
Class<?>[] inputSerdeTypeArgs =
TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
- checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
- "Inconsistent types found between function input type and
input serde type: "
- + " function type = " + typeArg + " should be
assignable from "
- + inputSerdeTypeArgs[0]);
+ if (deser) {
+ checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
+ "Inconsistent types found between function input type and
serde type: "
+ + " function type = " + typeArg + " should be
assignable from "
+ + inputSerdeTypeArgs[0]);
+ } else {
+ checkArgument(inputSerdeTypeArgs[0].isAssignableFrom(typeArg),
+ "Inconsistent types found between function input type and
serde type: "
+ + " serde type = " + inputSerdeTypeArgs[0] + "
should be assignable from "
+ + typeArg);
+ }
return serDe;
}
- public static Schema<?> initializeCustomSchema(String schemaClassName,
ClassLoader clsLoader, Class<?> typeArg) {
+ public static Schema<?> initializeCustomSchema(String schemaClassName,
ClassLoader clsLoader, Class<?> typeArg,
+ boolean input) {
Schema<?> schema = createInstance(schemaClassName, clsLoader,
Schema.class);
Class<?>[] inputSerdeTypeArgs =
TypeResolver.resolveRawArguments(Schema.class, schema.getClass());
- checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
- "Inconsistent types found between function input type and
input schema type: "
- + " function type = " + typeArg + " should be
assignable from "
- + inputSerdeTypeArgs[0]);
-
+ if (input) {
+ checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
+ "Inconsistent types found between function type and schema
type: "
+ + " function type = " + typeArg + " should be
assignable from "
+ + inputSerdeTypeArgs[0]);
+ } else {
+ checkArgument(inputSerdeTypeArgs[0].isAssignableFrom(typeArg),
+ "Inconsistent types found between function type and schema
type: "
+ + " schema type = " + inputSerdeTypeArgs[0] + "
should be assignable from "
+ + typeArg);
+ }
return schema;
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 5ec725c..835e288 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -277,10 +277,10 @@ public class PulsarSink<T> implements Sink<T> {
if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
return (Schema<T>)
topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
- pulsarSinkConfig.getSchemaType());
+ pulsarSinkConfig.getSchemaType(), false);
} else {
return (Schema<T>)
topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
- pulsarSinkConfig.getSerdeClassName());
+ pulsarSinkConfig.getSerdeClassName(), false);
}
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 244ab70..e1059f3 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -162,9 +162,9 @@ public class PulsarSource<T> extends PushSource<T>
implements MessageListener<T>
pulsarSourceConfig.getTopicSchema().forEach((topic, conf) -> {
Schema<T> schema;
if (conf.getSerdeClassName() != null &&
!conf.getSerdeClassName().isEmpty()) {
- schema = (Schema<T>) topicSchema.getSchema(topic, typeArg,
conf.getSerdeClassName());
+ schema = (Schema<T>) topicSchema.getSchema(topic, typeArg,
conf.getSerdeClassName(), true);
} else {
- schema = (Schema<T>) topicSchema.getSchema(topic, typeArg,
conf.getSchemaType());
+ schema = (Schema<T>) topicSchema.getSchema(topic, typeArg,
conf.getSchemaType(), true);
}
configs.put(topic,
ConsumerConfig.<T>
builder().schema(schema).isRegexPattern(conf.isRegexPattern()).build());
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 1802ee5..2ac5b65 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -51,16 +51,12 @@ public class TopicSchema {
public static final String DEFAULT_SERDE =
"org.apache.pulsar.functions.api.utils.DefaultSerDe";
- public Schema<?> getSchema(String topic, Object object) {
- return getSchema(topic, object.getClass(), "");
+ public Schema<?> getSchema(String topic, Object object, String
schemaTypeOrClassName, boolean input) {
+ return getSchema(topic, object.getClass(), schemaTypeOrClassName,
input);
}
- public Schema<?> getSchema(String topic, Object object, String
schemaTypeOrClassName) {
- return getSchema(topic, object.getClass(), schemaTypeOrClassName);
- }
-
- public Schema<?> getSchema(String topic, Class<?> clazz, String
schemaTypeOrClassName) {
- return cachedSchemas.computeIfAbsent(topic, t ->
newSchemaInstance(topic, clazz, schemaTypeOrClassName));
+ public Schema<?> getSchema(String topic, Class<?> clazz, String
schemaTypeOrClassName, boolean input) {
+ return cachedSchemas.computeIfAbsent(topic, t ->
newSchemaInstance(topic, clazz, schemaTypeOrClassName, input));
}
public Schema<?> getSchema(String topic, Class<?> clazz,
Optional<SchemaType> schemaType) {
@@ -134,7 +130,7 @@ public class TopicSchema {
}
@SuppressWarnings("unchecked")
- private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz,
String schemaTypeOrClassName) {
+ private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz,
String schemaTypeOrClassName, boolean input) {
// The schemaTypeOrClassName can represent multiple thing, either a
schema type, a schema class name or a ser-de
// class name.
@@ -161,11 +157,11 @@ public class TopicSchema {
// First try with Schema
try {
return (Schema<T>)
InstanceUtils.initializeCustomSchema(schemaTypeOrClassName,
- Thread.currentThread().getContextClassLoader(), clazz);
+ Thread.currentThread().getContextClassLoader(), clazz,
input);
} catch (Throwable t) {
// Now try with Serde or just fail
SerDe<T> serDe = (SerDe<T>)
InstanceUtils.initializeSerDe(schemaTypeOrClassName,
- Thread.currentThread().getContextClassLoader(), clazz);
+ Thread.currentThread().getContextClassLoader(), clazz,
input);
return new SerDeSchema<>(serDe);
}
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 766dee3..72e3c56 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -140,7 +140,7 @@ public class PulsarSinkTest {
fail("Should fail constructing java instance if function type is
inconsistent with serde type");
} catch (RuntimeException ex) {
log.error("RuntimeException: {}", ex, ex);
- assertTrue(ex.getMessage().startsWith("Inconsistent types found
between function input type and input serde type:"));
+ assertTrue(ex.getMessage().startsWith("Inconsistent types found
between function input type and serde type:"));
} catch (Exception ex) {
log.error("Exception: {}", ex, ex);
assertTrue(false);
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 60f684f..e4825f2 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -159,7 +159,7 @@ public class PulsarSourceTest {
fail("Should fail constructing java instance if function type is
inconsistent with serde type");
} catch (RuntimeException ex) {
log.error("RuntimeException: {}", ex, ex);
- assertTrue(ex.getMessage().startsWith("Inconsistent types found
between function input type and input serde type:"));
+ assertTrue(ex.getMessage().startsWith("Inconsistent types found
between function input type and serde type:"));
} catch (Exception ex) {
log.error("Exception: {}", ex, ex);
assertTrue(false);
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index f264d2a..f60f3c0 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -381,7 +381,7 @@ public class ValidatorImpls {
// implements SerDe class
if (functionConfig.getCustomSerdeInputs() != null) {
functionConfig.getCustomSerdeInputs().forEach((topicName,
inputSerializer) -> {
- validateSerde(inputSerializer, typeArgs[0], name,
clsLoader);
+ validateSerde(inputSerializer, typeArgs[0], name,
clsLoader, true);
});
}
@@ -389,7 +389,7 @@ public class ValidatorImpls {
// implements SerDe class
if (functionConfig.getCustomSchemaInputs() != null) {
functionConfig.getCustomSchemaInputs().forEach((topicName,
schemaType) -> {
- validateSchema(schemaType, typeArgs[0], name, clsLoader);
+ validateSchema(schemaType, typeArgs[0], name, clsLoader,
true);
});
}
@@ -405,10 +405,10 @@ public class ValidatorImpls {
String.format("Only one of schemaType or
serdeClassName should be set in inputSpec"));
}
if (conf.getSerdeClassName() != null &&
!conf.getSerdeClassName().isEmpty()) {
- validateSerde(conf.getSerdeClassName(), typeArgs[0],
name, clsLoader);
+ validateSerde(conf.getSerdeClassName(), typeArgs[0],
name, clsLoader, true);
}
if (conf.getSchemaType() != null &&
!conf.getSchemaType().isEmpty()) {
- validateSchema(conf.getSchemaType(), typeArgs[0],
name, clsLoader);
+ validateSchema(conf.getSchemaType(), typeArgs[0],
name, clsLoader, true);
}
});
}
@@ -425,16 +425,17 @@ public class ValidatorImpls {
}
if (functionConfig.getOutputSchemaType() != null &&
!functionConfig.getOutputSchemaType().isEmpty()) {
- validateSchema(functionConfig.getOutputSchemaType(),
typeArgs[1], name, clsLoader);
+ validateSchema(functionConfig.getOutputSchemaType(),
typeArgs[1], name, clsLoader, false);
}
if (functionConfig.getOutputSerdeClassName() != null &&
!functionConfig.getOutputSerdeClassName().isEmpty()) {
- validateSerde(functionConfig.getOutputSerdeClassName(),
typeArgs[1], name, clsLoader);
+ validateSerde(functionConfig.getOutputSerdeClassName(),
typeArgs[1], name, clsLoader, false);
}
}
- private static void validateSchema(String schemaType, Class<?>
typeArg, String name, ClassLoader clsLoader) {
+ private static void validateSchema(String schemaType, Class<?>
typeArg, String name, ClassLoader clsLoader,
+ boolean input) {
if (StringUtils.isEmpty(schemaType) ||
getBuiltinSchemaType(schemaType) != null) {
// If it's empty, we use the default schema and no need to
validate
// If it's built-in, no need to validate
@@ -447,11 +448,12 @@ public class ValidatorImpls {
schemaType,
Schema.class.getCanonicalName()));
}
- validateSchemaType(schemaType, typeArg, clsLoader);
+ validateSchemaType(schemaType, typeArg, clsLoader, input);
}
}
- private static void validateSerde(String inputSerializer, Class<?>
typeArg, String name, ClassLoader clsLoader) {
+ private static void validateSerde(String inputSerializer, Class<?>
typeArg, String name, ClassLoader clsLoader,
+ boolean deser) {
if (StringUtils.isEmpty(inputSerializer)) return;
Class<?> serdeClass;
try {
@@ -492,8 +494,14 @@ public class ValidatorImpls {
throw new IllegalArgumentException("Failed to load type
class", e);
}
- if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
- throw new IllegalArgumentException("Serializer type
mismatch " + typeArg + " vs " + serDeTypes[0]);
+ if (deser) {
+ if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
+ throw new IllegalArgumentException("Serializer type
mismatch " + typeArg + " vs " + serDeTypes[0]);
+ }
+ } else {
+ if (!serdeInputClass.isAssignableFrom(fnInputClass)) {
+ throw new IllegalArgumentException("Serializer type
mismatch " + typeArg + " vs " + serDeTypes[0]);
+ }
}
}
}
@@ -734,10 +742,10 @@ public class ValidatorImpls {
}
if (sourceConfig.getSerdeClassName() != null &&
!sourceConfig.getSerdeClassName().isEmpty()) {
-
FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg,
name, clsLoader);
+
FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg,
name, clsLoader, false);
}
if (sourceConfig.getSchemaType() != null &&
!sourceConfig.getSchemaType().isEmpty()) {
-
FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg,
name, clsLoader);
+
FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg,
name, clsLoader, false);
}
} catch (IOException e) {
throw new IllegalArgumentException(e);
@@ -774,13 +782,13 @@ public class ValidatorImpls {
if (sinkConfig.getTopicToSerdeClassName() != null) {
sinkConfig.getTopicToSerdeClassName().forEach((topicName,
serdeClassName) -> {
- FunctionConfigValidator.validateSerde(serdeClassName,
typeArg, name, clsLoader);
+ FunctionConfigValidator.validateSerde(serdeClassName,
typeArg, name, clsLoader, true);
});
}
if (sinkConfig.getTopicToSchemaType() != null) {
sinkConfig.getTopicToSchemaType().forEach((topicName,
schemaType) -> {
- FunctionConfigValidator.validateSchema(schemaType,
typeArg, name, clsLoader);
+ FunctionConfigValidator.validateSchema(schemaType,
typeArg, name, clsLoader, true);
});
}
@@ -794,10 +802,10 @@ public class ValidatorImpls {
throw new IllegalArgumentException("Only one of
serdeClassName or schemaType should be set");
}
if (consumerSpec.getSerdeClassName() != null &&
!consumerSpec.getSerdeClassName().isEmpty()) {
-
FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(),
typeArg, name, clsLoader);
+
FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(),
typeArg, name, clsLoader, true);
}
if (consumerSpec.getSchemaType() != null &&
!consumerSpec.getSchemaType().isEmpty()) {
-
FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg,
name, clsLoader);
+
FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg,
name, clsLoader, true);
}
});
}
@@ -900,8 +908,8 @@ public class ValidatorImpls {
}
}
- private static void validateSchemaType(String scheamType, Class<?>
typeArg, ClassLoader clsLoader) {
- validateCustomSchemaType(scheamType, typeArg, clsLoader);
+ private static void validateSchemaType(String scheamType, Class<?>
typeArg, ClassLoader clsLoader, boolean input) {
+ validateCustomSchemaType(scheamType, typeArg, clsLoader, input);
}
private static void validateSerDeType(String serdeClassName, Class<?>
typeArg, ClassLoader clsLoader) {
@@ -929,7 +937,8 @@ public class ValidatorImpls {
}
}
- private static void validateCustomSchemaType(String schemaClassName,
Class<?> typeArg, ClassLoader clsLoader) {
+ private static void validateCustomSchemaType(String schemaClassName,
Class<?> typeArg, ClassLoader clsLoader,
+ boolean input) {
Schema<?> schema = (Schema<?>)
Reflections.createInstance(schemaClassName, clsLoader);
if (schema == null) {
throw new IllegalArgumentException(String.format("The Schema class
%s does not exist",
@@ -948,9 +957,16 @@ public class ValidatorImpls {
throw new IllegalArgumentException("Failed to load type class", e);
}
- if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
- throw new IllegalArgumentException(
- "Schema type mismatch " + typeArg + " vs " +
schemaTypes[0]);
+ if (input) {
+ if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
+ throw new IllegalArgumentException(
+ "Schema type mismatch " + typeArg + " vs " +
schemaTypes[0]);
+ }
+ } else {
+ if (!schemaInputClass.isAssignableFrom(fnInputClass)) {
+ throw new IllegalArgumentException(
+ "Schema type mismatch " + typeArg + " vs " +
schemaTypes[0]);
+ }
}
}
}
\ No newline at end of file