This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 89ac98e4af363b09f2fe8e309539b0e35243aaee Author: Boyang Jerry Peng <[email protected]> AuthorDate: Sat Jun 12 00:25:37 2021 -0700 Remove the unwanted dependencies in the pulsar function's instance jar and make SchemaInfo an interface (#10878) ### Motivation The java-instance.jar generated by the pulsar-functions-runtime-all module should only contain interfaces that Pulsar Function's framework uses to interact with user code. The module should on have the following dependencies 1. pulsar-io-core 2. pulsar-functions-api 3. pulsar-client-api 4. slf4j-api 5. log4j-slf4j-impl 6. log4j-api 7. log4j-core *Explain here the context, and why you're making that change. What is the problem you're trying to solve.* ### Modifications Change dep pulsar-client-original to pulsar-client-api Slight changes in the top level pom for what is included in all sub-modules so that additional deps don't land into java-instance.jar There is also a fix for an issue introduced by https://github.com/apache/pulsar/pull/9673. The thread context class loader was set incorrectly in ThreadRuntime. ### Future improvements 1. We should also add a test in the future to make sure external libraries don't get add accidentally this module and java-instance.jar 2. Rename the module "pulsar-functions-runtime-all" to something that describes its function better. The current name can be confusing (cherry picked from commit d81b5f8b8e6cb17f307ec830accaf9dd95d7643b) --- distribution/server/pom.xml | 6 ++ pom.xml | 9 +-- .../apache/pulsar/broker/service/ServerCnx.java | 2 +- .../admin/AdminApiSchemaValidationEnforced.java | 34 ++++---- .../schema/JsonSchemaCompatibilityCheckTest.java | 12 +-- .../java/org/apache/pulsar/schema/SchemaTest.java | 5 +- .../SchemaCompatibilityCheckTest.java | 3 +- .../pulsar/client/admin/internal/SchemasImpl.java | 15 ++-- .../apache/pulsar/common/schema/SchemaInfo.java | 52 ++---------- .../org/apache/pulsar/client/impl/ClientCnx.java | 2 +- .../pulsar/client/impl/HttpLookupService.java | 2 +- .../client/impl/schema/AutoProduceBytesSchema.java | 7 +- .../pulsar/client/impl/schema/BooleanSchema.java | 2 +- .../pulsar/client/impl/schema/ByteBufSchema.java | 2 +- .../client/impl/schema/ByteBufferSchema.java | 2 +- .../pulsar/client/impl/schema/ByteSchema.java | 2 +- .../pulsar/client/impl/schema/BytesSchema.java | 2 +- .../pulsar/client/impl/schema/DateSchema.java | 2 +- .../pulsar/client/impl/schema/DoubleSchema.java | 2 +- .../pulsar/client/impl/schema/FloatSchema.java | 2 +- .../pulsar/client/impl/schema/InstantSchema.java | 2 +- .../pulsar/client/impl/schema/IntSchema.java | 2 +- .../pulsar/client/impl/schema/JSONSchema.java | 10 +-- .../pulsar/client/impl/schema/LocalDateSchema.java | 2 +- .../client/impl/schema/LocalDateTimeSchema.java | 2 +- .../pulsar/client/impl/schema/LocalTimeSchema.java | 2 +- .../pulsar/client/impl/schema/LongSchema.java | 2 +- .../client/impl/schema/ProtobufNativeSchema.java | 7 +- .../pulsar/client/impl/schema/ProtobufSchema.java | 7 +- .../impl/schema/RecordSchemaBuilderImpl.java | 2 +- .../pulsar/client/impl}/schema/SchemaInfoUtil.java | 48 ++++++----- .../pulsar/client/impl/schema/ShortSchema.java | 2 +- .../pulsar/client/impl/schema/StringSchema.java | 4 +- .../pulsar/client/impl/schema/StructSchema.java | 2 +- .../pulsar/client/impl/schema/TimeSchema.java | 2 +- .../pulsar/client/impl/schema/TimestampSchema.java | 2 +- .../pulsar/client/impl/schema/util/SchemaUtil.java | 3 +- .../src/main/resources/findbugsExclude.xml | 16 ++++ .../client/impl/schema/KeyValueSchemaInfoTest.java | 2 +- .../client/impl/schema/KeyValueSchemaTest.java | 31 ++++--- .../pulsar/client/impl/schema/SchemaInfoTest.java | 16 ++-- .../client/impl/schema/StringSchemaTest.java | 4 +- pulsar-common/pom.xml | 4 + .../admin/internal/data/AuthPoliciesImpl.java | 20 +++-- .../client/impl/schema/KeyValueSchemaInfo.java | 15 ++-- .../pulsar/client/impl/schema/SchemaInfoImpl.java | 18 ++--- .../pulsar/client/impl/schema/SchemaUtils.java | 0 .../pulsar/common/protocol/schema/SchemaData.java | 3 +- pulsar-functions/runtime-all/pom.xml | 30 ++++++- .../functions/instance/JavaInstanceMain.java | 1 + .../functions/instance/JavaInstanceDepsTest.java | 77 ++++++++++++++++++ .../functions/runtime/thread/ThreadRuntime.java | 1 - .../connect/schema/KafkaSchemaWrappedSchema.java | 3 +- .../apache/pulsar/io/kafka/AvroSchemaCache.java | 3 +- .../apache/pulsar/io/kafka/KafkaBytesSource.java | 3 +- pulsar-sql/presto-distribution/LICENSE | 1 - .../org/apache/pulsar/sql/presto/PulsarSplit.java | 3 +- .../pulsar/sql/presto/TestPulsarMetadata.java | 15 ++-- .../decoder/primitive/TestPrimitiveDecoder.java | 25 +++--- tests/docker-images/java-test-functions/pom.xml | 18 +---- .../io/sources/GenericRecordSourceTest.java | 94 ++++++++++++++-------- 61 files changed, 406 insertions(+), 263 deletions(-) diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml index 57ae871..27d12c4 100644 --- a/distribution/server/pom.xml +++ b/distribution/server/pom.xml @@ -65,6 +65,12 @@ </dependency> <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper-prometheus-metrics</artifactId> + <version>${zookeeper.version}</version> + </dependency> + + <dependency> <groupId>${project.groupId}</groupId> <artifactId>pulsar-package-bookkeeper-storage</artifactId> <version>${project.version}</version> diff --git a/pom.xml b/pom.xml index f4ce36c..c98a55a 100644 --- a/pom.xml +++ b/pom.xml @@ -1199,13 +1199,12 @@ flexible messaging model and an intuitive client API.</description> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>*</artifactId> </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>*</artifactId> + </exclusion> </exclusions> </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper-prometheus-metrics</artifactId> - <version>${zookeeper.version}</version> - </dependency> </dependencies> <build> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2f3d32d..4445f64 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -83,6 +83,7 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.schema.SchemaInfoUtil; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandAck; @@ -135,7 +136,6 @@ import org.apache.pulsar.common.protocol.CommandUtils; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarHandler; import org.apache.pulsar.common.protocol.schema.SchemaData; -import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java index 1ebf7f1..3daf920 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -97,11 +98,12 @@ public class AdminApiSchemaValidationEnforced extends MockedPulsarServiceBaseTes assertTrue(e.getMessage().contains("HTTP 404 Not Found")); } Map<String, String> properties = Maps.newHashMap(); - SchemaInfo schemaInfo = new SchemaInfo(); - schemaInfo.setType(SchemaType.STRING); - schemaInfo.setProperties(properties); - schemaInfo.setName("test"); - schemaInfo.setSchema("".getBytes()); + SchemaInfo schemaInfo = SchemaInfoImpl.builder() + .type(SchemaType.STRING) + .properties(properties) + .name("test") + .schema("".getBytes()) + .build(); PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties); admin.schemas().createSchema(topicName, postSchemaPayload); try (Producer p = pulsarClient.newProducer().topic(topicName).create()) { @@ -145,11 +147,12 @@ public class AdminApiSchemaValidationEnforced extends MockedPulsarServiceBaseTes } Map<String, String> properties = Maps.newHashMap(); properties.put("key1", "value1"); - SchemaInfo schemaInfo = new SchemaInfo(); - schemaInfo.setType(SchemaType.STRING); - schemaInfo.setProperties(properties); - schemaInfo.setName("test"); - schemaInfo.setSchema("".getBytes()); + SchemaInfo schemaInfo = SchemaInfoImpl.builder() + .type(SchemaType.STRING) + .properties(properties) + .name("test") + .schema("".getBytes()) + .build(); PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties); admin.schemas().createSchema(topicName, postSchemaPayload); try (Producer p = pulsarClient.newProducer().topic(topicName).create()) { @@ -174,11 +177,12 @@ public class AdminApiSchemaValidationEnforced extends MockedPulsarServiceBaseTes } admin.namespaces().setSchemaValidationEnforced(namespace,true); Map<String, String> properties = Maps.newHashMap(); - SchemaInfo schemaInfo = new SchemaInfo(); - schemaInfo.setType(SchemaType.STRING); - schemaInfo.setProperties(properties); - schemaInfo.setName("test"); - schemaInfo.setSchema("".getBytes()); + SchemaInfo schemaInfo = SchemaInfoImpl.builder() + .type(SchemaType.STRING) + .properties(properties) + .name("test") + .schema("".getBytes()) + .build(); PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties); admin.schemas().createSchema(topicName, postSchemaPayload); try (Producer<String> p = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java index 04914fe..32a9f9e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaInfo; @@ -118,11 +119,12 @@ public class JsonSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilit JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(mapper); JsonSchema schema = schemaGen.generateSchema(pojo); - SchemaInfo info = new SchemaInfo(); - info.setName(""); - info.setProperties(properties); - info.setType(SchemaType.JSON); - info.setSchema(mapper.writeValueAsBytes(schema)); + SchemaInfo info = SchemaInfoImpl.builder() + .name("") + .properties(properties) + .type(SchemaType.JSON) + .schema(mapper.writeValueAsBytes(schema)) + .build(); return new OldJSONSchema<>(info, pojo, mapper); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index d97b989..6e860ad 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -61,6 +61,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -437,7 +438,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { admin.topics().createPartitionedTopic(topic, 2); // set schema - SchemaInfo schemaInfo = SchemaInfo + SchemaInfo schemaInfo = SchemaInfoImpl .builder() .schema(new byte[0]) .name("dummySchema") @@ -653,7 +654,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { final Map<String, String> map = new HashMap<>(); map.put("key", null); map.put(null, "value"); // null key is not allowed for JSON, it's only for test here - Schema.INT32.getSchemaInfo().setProperties(map); + ((SchemaInfoImpl)Schema.INT32.getSchemaInfo()).setProperties(map); final Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32).topic(topic) .subscriptionName("sub") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java index d6d96f7..61d8332 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java @@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -323,7 +324,7 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest { SchemaCompatibilityStrategy.FULL); byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class) .getSchemaInfo().getSchema(), UTF_8) + "/n /n /n").getBytes(); - SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build(); + SchemaInfo schemaInfo = SchemaInfoImpl.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build(); admin.schemas().createSchema(fqtn, schemaInfo); admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java index 9a9a4ed..4408ae2 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java @@ -31,6 +31,7 @@ import javax.ws.rs.client.WebTarget; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Schemas; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.internal.DefaultImplementation; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse; @@ -434,7 +435,7 @@ public class SchemasImpl extends BaseResource implements Schemas { // the util function converts `GetSchemaResponse` to `SchemaInfo` static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn, GetSchemaResponse response) { - SchemaInfo info = new SchemaInfo(); + byte[] schema; if (response.getType() == SchemaType.KEY_VALUE) { schema = DefaultImplementation.convertKeyValueDataStringToSchemaInfoSchema( @@ -442,11 +443,13 @@ public class SchemasImpl extends BaseResource implements Schemas { } else { schema = response.getData().getBytes(UTF_8); } - info.setSchema(schema); - info.setType(response.getType()); - info.setProperties(response.getProperties()); - info.setName(tn.getLocalName()); - return info; + + return SchemaInfoImpl.builder() + .schema(schema) + .type(response.getType()) + .properties(response.getProperties()) + .name(tn.getLocalName()) + .build(); } static SchemaInfoWithVersion convertGetSchemaResponseToSchemaInfoWithVersion(TopicName tn, diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java index f2c5860..0070c4c 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java @@ -18,16 +18,7 @@ */ package org.apache.pulsar.common.schema; -import static java.nio.charset.StandardCharsets.UTF_8; -import java.util.Base64; -import java.util.Collections; import java.util.Map; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; -import lombok.experimental.Accessors; import org.apache.pulsar.client.internal.DefaultImplementation; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -37,55 +28,24 @@ import org.apache.pulsar.common.classification.InterfaceStability; */ @InterfaceAudience.Public @InterfaceStability.Stable -@Data -@AllArgsConstructor -@NoArgsConstructor -@Accessors(chain = true) -@Builder -public class SchemaInfo { +public interface SchemaInfo { - @EqualsAndHashCode.Exclude - private String name; + String getName(); /** * The schema data in AVRO JSON format. */ - private byte[] schema; + byte[] getSchema(); /** * The type of schema (AVRO, JSON, PROTOBUF, etc..). */ - private SchemaType type; + SchemaType getType(); /** * Additional properties of the schema definition (implementation defined). */ - @Builder.Default - private Map<String, String> properties = Collections.emptyMap(); - - public String getSchemaDefinition() { - if (null == schema) { - return ""; - } - - switch (type) { - case AVRO: - case JSON: - case PROTOBUF: - case PROTOBUF_NATIVE: - return new String(schema, UTF_8); - case KEY_VALUE: - KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue = - DefaultImplementation.decodeKeyValueSchemaInfo(this); - return DefaultImplementation.jsonifyKeyValueSchemaInfo(schemaInfoKeyValue); - default: - return Base64.getEncoder().encodeToString(schema); - } - } - - @Override - public String toString(){ - return DefaultImplementation.jsonifySchemaInfo(this); - } + Map<String, String> getProperties(); + String getSchemaDefinition(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index a7cc68e..bed2b9c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -90,7 +90,7 @@ import org.apache.pulsar.common.api.proto.CommandSuccess; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarHandler; -import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil; +import org.apache.pulsar.client.impl.schema.SchemaInfoUtil; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.util.FutureUtil; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 9108a6e..f2cc169 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -47,7 +47,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.protocol.schema.GetSchemaResponse; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaInfo; -import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil; +import org.apache.pulsar.client.impl.schema.SchemaInfoUtil; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java index 3db9554..8971aab 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.schema; import static com.google.common.base.Preconditions.checkState; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -74,9 +75,9 @@ public class AutoProduceBytesSchema<T> implements Schema<byte[]> { if (requireSchemaValidation) { // verify if the message can be decoded by the underlying schema - if (schema instanceof KeyValueSchemaImpl - && ((KeyValueSchemaImpl) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) { - ((KeyValueSchemaImpl) schema).getValueSchema().validate(message); + if (schema instanceof KeyValueSchema + && ((KeyValueSchema) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) { + ((KeyValueSchema) schema).getValueSchema().validate(message); } else { schema.validate(message); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java index c66ff43..3b5296e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java @@ -32,7 +32,7 @@ public class BooleanSchema extends AbstractSchema<Boolean> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("Boolean") .setType(SchemaType.BOOLEAN) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java index 658e398..ce68298 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java @@ -33,7 +33,7 @@ public class ByteBufSchema extends AbstractSchema<ByteBuf> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("ByteBuf") .setType(SchemaType.BYTES) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java index c560f0e..0ff308f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java @@ -34,7 +34,7 @@ public class ByteBufferSchema extends AbstractSchema<ByteBuffer> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("ByteBuffer") .setType(SchemaType.BYTES) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java index 4e4c27e..6d51687 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java @@ -32,7 +32,7 @@ public class ByteSchema extends AbstractSchema<Byte> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("INT8") .setType(SchemaType.INT8) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java index 9c7ec37..98a0e66 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java @@ -31,7 +31,7 @@ public class BytesSchema extends AbstractSchema<byte[]> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("Bytes") .setType(SchemaType.BYTES) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java index 295dae6..cbdb912 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java @@ -33,7 +33,7 @@ public class DateSchema extends AbstractSchema<Date> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("Date") .setType(SchemaType.DATE) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java index baa1aac..4b269a6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java @@ -32,7 +32,7 @@ public class DoubleSchema extends AbstractSchema<Double> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("Double") .setType(SchemaType.DOUBLE) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java index aed905b..84d4073 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java @@ -32,7 +32,7 @@ public class FloatSchema extends AbstractSchema<Float> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("Float") .setType(SchemaType.FLOAT) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java index 5830cea..db33de7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java @@ -33,7 +33,7 @@ public class InstantSchema extends AbstractSchema<Instant> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("Instant") .setType(SchemaType.INSTANT) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java index fc8338e..dfad280 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java @@ -32,7 +32,7 @@ public class IntSchema extends AbstractSchema<Integer> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("INT32") .setType(SchemaType.INT32) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java index 4e3b874..9fe6aed 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java @@ -74,11 +74,11 @@ public class JSONSchema<T> extends AvroBaseStructSchema<T> { ObjectMapper objectMapper = new ObjectMapper(); JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(objectMapper); JsonSchema jsonBackwardsCompatibleSchema = schemaGen.generateSchema(pojo); - backwardsCompatibleSchemaInfo = new SchemaInfo(); - backwardsCompatibleSchemaInfo.setName(""); - backwardsCompatibleSchemaInfo.setProperties(schemaInfo.getProperties()); - backwardsCompatibleSchemaInfo.setType(SchemaType.JSON); - backwardsCompatibleSchemaInfo.setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema)); + backwardsCompatibleSchemaInfo = new SchemaInfoImpl() + .setName("") + .setProperties(schemaInfo.getProperties()) + .setType(SchemaType.JSON) + .setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema)); } catch (JsonProcessingException ex) { throw new RuntimeException(ex); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java index add6fd2..18ef3af 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java @@ -32,7 +32,7 @@ public class LocalDateSchema extends AbstractSchema<LocalDate> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("LocalDate") .setType(SchemaType.LOCAL_DATE) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java index aa86a19..05b2787 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java @@ -37,7 +37,7 @@ public class LocalDateTimeSchema extends AbstractSchema<LocalDateTime> { public static final String DELIMITER = ":"; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("LocalDateTime") .setType(SchemaType.LOCAL_DATE_TIME) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java index 6e2bf62..e53c620 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java @@ -32,7 +32,7 @@ public class LocalTimeSchema extends AbstractSchema<LocalTime> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("LocalTime") .setType(SchemaType.LOCAL_TIME) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java index f1491f4..deccaf4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java @@ -32,7 +32,7 @@ public class LongSchema extends AbstractSchema<Long> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("INT64") .setType(SchemaType.INT64) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java index 385fc41..9cf753c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java @@ -72,11 +72,10 @@ public class ProtobufNativeSchema<T extends GeneratedMessageV3> extends Abstract setReader(new ProtobufNativeReader<>(protoMessageInstance)); setWriter(new ProtobufNativeWriter<>()); // update properties with protobuf related properties - Map<String, String> allProperties = new HashMap<>(); - allProperties.putAll(schemaInfo.getProperties()); // set protobuf parsing info + Map<String, String> allProperties = new HashMap<>(schemaInfo.getProperties()); allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance)); - schemaInfo.setProperties(allProperties); + ((SchemaInfoImpl)schemaInfo).setProperties(allProperties); } private String getParsingInfo(T protoMessageInstance) { @@ -124,7 +123,7 @@ public class ProtobufNativeSchema<T extends GeneratedMessageV3> extends Abstract } Descriptors.Descriptor descriptor = createProtobufNativeSchema(schemaDefinition.getPojo()); - SchemaInfo schemaInfo = SchemaInfo.builder() + SchemaInfo schemaInfo = SchemaInfoImpl.builder() .schema(ProtobufNativeSchemaUtils.serialize(descriptor)) .type(SchemaType.PROTOBUF_NATIVE) .name("") diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java index f7971eb..275cacd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java @@ -69,11 +69,10 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex setReader(new ProtobufReader<>(protoMessageInstance)); setWriter(new ProtobufWriter<>()); // update properties with protobuf related properties - Map<String, String> allProperties = new HashMap<>(); - allProperties.putAll(schemaInfo.getProperties()); // set protobuf parsing info + Map<String, String> allProperties = new HashMap<>(schemaInfo.getProperties()); allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance)); - schemaInfo.setProperties(allProperties); + ((SchemaInfoImpl)schemaInfo).setProperties(allProperties); } private String getParsingInfo(T protoMessageInstance) { @@ -111,7 +110,7 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex + " is not assignable from " + pojo.getName()); } - SchemaInfo schemaInfo = SchemaInfo.builder() + SchemaInfo schemaInfo = SchemaInfoImpl.builder() .schema(createProtobufAvroSchema(schemaDefinition.getPojo()).toString().getBytes(UTF_8)) .type(SchemaType.PROTOBUF) .name("") diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java index ee9f0cb..0fda7d5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java @@ -105,7 +105,7 @@ public class RecordSchemaBuilderImpl implements RecordSchemaBuilder { } baseSchema.setFields(avroFields); - return new SchemaInfo( + return new SchemaInfoImpl( name, baseSchema.toString().getBytes(UTF_8), schemaType, diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java similarity index 58% rename from pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java index ac5997d..fb5263e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java @@ -16,16 +16,20 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.common.protocol.schema; +package org.apache.pulsar.client.impl.schema; import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.Map; import java.util.TreeMap; import lombok.experimental.UtilityClass; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.Schema; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.schema.GetSchemaResponse; +import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaInfo; /** @@ -35,37 +39,39 @@ import org.apache.pulsar.common.schema.SchemaInfo; public class SchemaInfoUtil { public static SchemaInfo newSchemaInfo(String name, SchemaData data) { - SchemaInfo si = new SchemaInfo(); - si.setName(name); - si.setSchema(data.getData()); - si.setType(data.getType()); - si.setProperties(data.getProps()); - return si; + return SchemaInfoImpl.builder() + .name(name) + .schema(data.getData()) + .type(data.getType()) + .properties(data.getProps()) + .build(); } public static SchemaInfo newSchemaInfo(Schema schema) { - SchemaInfo si = new SchemaInfo(); - si.setName(schema.getName()); - si.setSchema(schema.getSchemaData()); - si.setType(Commands.getSchemaType(schema.getType())); + SchemaInfoImpl.SchemaInfoImplBuilder si = SchemaInfoImpl.builder() + .name(schema.getName()) + .schema(schema.getSchemaData()) + .type(Commands.getSchemaType(schema.getType())); if (schema.getPropertiesCount() == 0) { - si.setProperties(Collections.emptyMap()); + si.properties(Collections.emptyMap()); } else { - si.setProperties(new TreeMap<>()); + Map<String, String> properties = new TreeMap<>(); for (int i = 0; i < schema.getPropertiesCount(); i++) { KeyValue kv = schema.getPropertyAt(i); - si.getProperties().put(kv.getKey(), kv.getValue()); + properties.put(kv.getKey(), kv.getValue()); } + + si.properties(properties); } - return si; + return si.build(); } public static SchemaInfo newSchemaInfo(String name, GetSchemaResponse schema) { - SchemaInfo si = new SchemaInfo(); - si.setName(name); - si.setSchema(schema.getData().getBytes(StandardCharsets.UTF_8)); - si.setType(schema.getType()); - si.setProperties(schema.getProperties()); - return si; + return SchemaInfoImpl.builder() + .name(name) + .schema(schema.getData().getBytes(StandardCharsets.UTF_8)) + .type(schema.getType()) + .properties(schema.getProperties()) + .build(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java index 4014405..bbb5ad6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java @@ -32,7 +32,7 @@ public class ShortSchema extends AbstractSchema<Short> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("INT16") .setType(SchemaType.INT16) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java index 7e57f6c..462fa60 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java @@ -46,7 +46,7 @@ public class StringSchema extends AbstractSchema<String> { // Ensure the ordering of the static initialization CHARSET_KEY = "__charset"; DEFAULT_CHARSET = StandardCharsets.UTF_8; - DEFAULT_SCHEMA_INFO = new SchemaInfo() + DEFAULT_SCHEMA_INFO = new SchemaInfoImpl() .setName("String") .setType(SchemaType.STRING) .setSchema(new byte[0]); @@ -87,7 +87,7 @@ public class StringSchema extends AbstractSchema<String> { this.charset = charset; Map<String, String> properties = new HashMap<>(); properties.put(CHARSET_KEY, charset.name()); - this.schemaInfo = new SchemaInfo() + this.schemaInfo = new SchemaInfoImpl() .setName(DEFAULT_SCHEMA_INFO.getName()) .setType(SchemaType.STRING) .setSchema(DEFAULT_SCHEMA_INFO.getSchema()) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java index 8cc1868..7ba116f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java @@ -105,7 +105,7 @@ public abstract class StructSchema<T> extends AbstractStructSchema<T> { } public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) { - return SchemaInfo.builder() + return SchemaInfoImpl.builder() .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8)) .properties(schemaDefinition.getProperties()) .name("") diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java index d56e4da..ab6e1ad 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java @@ -33,7 +33,7 @@ public class TimeSchema extends AbstractSchema<Time> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("Time") .setType(SchemaType.TIME) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java index 899e159..755b466 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java @@ -33,7 +33,7 @@ public class TimestampSchema extends AbstractSchema<Timestamp> { private static final SchemaInfo SCHEMA_INFO; static { - SCHEMA_INFO = new SchemaInfo() + SCHEMA_INFO = new SchemaInfoImpl() .setName("Timestamp") .setType(SchemaType.TIMESTAMP) .setSchema(new byte[0]); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java index 7e1e1c0..70d7fc0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java @@ -23,6 +23,7 @@ import org.apache.avro.reflect.ReflectData; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -47,7 +48,7 @@ public class SchemaUtil { } public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) { - return SchemaInfo.builder() + return SchemaInfoImpl.builder() .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8)) .properties(schemaDefinition.getProperties()) .name("") diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml b/pulsar-client/src/main/resources/findbugsExclude.xml index bf01926..a37c886 100644 --- a/pulsar-client/src/main/resources/findbugsExclude.xml +++ b/pulsar-client/src/main/resources/findbugsExclude.xml @@ -69,6 +69,22 @@ <Class name="org.apache.pulsar.client.impl.schema.BooleanSchema"/> <Bug pattern="NP_BOOLEAN_RETURN_NULL"/> </Match> + + <Match> + <Class name="org.apache.pulsar.client.impl.schema.SchemaInfoImpl"/> + <Bug pattern="EI_EXPOSE_REP"/> + </Match> + + <Match> + <Class name="org.apache.pulsar.client.impl.schema.SchemaInfoImpl"/> + <Bug pattern="EI_EXPOSE_REP2"/> + </Match> + + <Match> + <Class name="org.apache.pulsar.client.impl.schema.SchemaInfoImpl$SchemaInfoImplBuilder"/> + <Bug pattern="EI_EXPOSE_REP2"/> + </Match> + <Match> <Class name="~org.apache.pulsar.client.impl.ConsumerImpl.*"/> diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java index 994f013..9474b80 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java @@ -171,7 +171,7 @@ public class KeyValueSchemaInfoTest { KeyValueEncodingType.SEPARATED ); - SchemaInfo oldSchemaInfo = new SchemaInfo() + SchemaInfo oldSchemaInfo = new SchemaInfoImpl() .setName("") .setType(SchemaType.KEY_VALUE) .setSchema(kvSchema.getSchemaInfo().getSchema()) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java index b0a86c2..bd94bf0 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Color; import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.testng.Assert; import org.testng.annotations.Test; @@ -67,28 +68,32 @@ public class KeyValueSchemaTest { @Test public void testFillParametersToSchemainfo() { - AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build()); - AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build()); - - fooSchema.getSchemaInfo().setName("foo"); - fooSchema.getSchemaInfo().setType(SchemaType.AVRO); Map<String, String> keyProperties = Maps.newTreeMap(); keyProperties.put("foo.key1", "value"); keyProperties.put("foo.key2", "value"); - fooSchema.getSchemaInfo().setProperties(keyProperties); - barSchema.getSchemaInfo().setName("bar"); - barSchema.getSchemaInfo().setType(SchemaType.AVRO); + Map<String, String> valueProperties = Maps.newTreeMap(); valueProperties.put("bar.key", "key"); - barSchema.getSchemaInfo().setProperties(valueProperties); + + AvroSchema<Foo> fooSchema = AvroSchema.of( + SchemaDefinition.<Foo>builder() + .withPojo(Foo.class) + .withProperties(keyProperties) + .build()); + AvroSchema<Bar> barSchema = AvroSchema.of( + SchemaDefinition.<Bar>builder() + .withPojo(Bar.class) + .withProperties(valueProperties) + .build()); + Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema); - assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.name"), "foo"); assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.type"), String.valueOf(SchemaType.AVRO)); - assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.properties"), "{\"foo.key1\":\"value\",\"foo.key2\":\"value\"}"); - assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.name"), "bar"); + assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.properties"), + "{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\",\"foo.key1\":\"value\",\"foo.key2\":\"value\"}"); assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.type"), String.valueOf(SchemaType.AVRO)); - assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.properties"), "{\"bar.key\":\"key\"}"); + assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.properties"), + "{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\",\"bar.key\":\"key\"}"); } @Test diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java index 7e34a64..f96e84e 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java @@ -289,7 +289,7 @@ public class SchemaInfoTest { @Test public void testUnsetProperties() { - final SchemaInfo schemaInfo = SchemaInfo.builder() + final SchemaInfo schemaInfo = SchemaInfoImpl.builder() .type(SchemaType.STRING) .schema(new byte[0]) .name("string") @@ -305,7 +305,7 @@ public class SchemaInfoTest { public void testSetProperties() { final Map<String, String> map = Maps.newHashMap(); map.put("test", "value"); - final SchemaInfo schemaInfo = SchemaInfo.builder() + final SchemaInfo schemaInfo = SchemaInfoImpl.builder() .type(SchemaType.STRING) .schema(new byte[0]) .name("string") @@ -322,10 +322,16 @@ public class SchemaInfoTest { public void testNullPropertyValue() { final Map<String, String> map = new HashMap<>(); map.put("key", null); - final IntSchema schema = new IntSchema(); - schema.getSchemaInfo().setProperties(map); + + SchemaInfo si = SchemaInfoImpl.builder() + .name("INT32") + .schema(new byte[0]) + .type(SchemaType.INT32) + .properties(map) + .build(); + // null key will be skipped by Gson when serializing JSON to String - assertEquals(schema.getSchemaInfo().toString(), INT32_SCHEMA_INFO); + assertEquals(si.toString(), INT32_SCHEMA_INFO); } } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java index b09bf4d..b250322 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java @@ -87,7 +87,7 @@ public class StringSchemaTest { @Test public void testSchemaInfoWithoutCharset() { - SchemaInfo si = new SchemaInfo() + SchemaInfo si = new SchemaInfoImpl() .setName("test-schema-info-without-charset") .setType(SchemaType.STRING) .setSchema(new byte[0]) @@ -122,7 +122,7 @@ public class StringSchemaTest { public void testSchemaInfoWithCharset(Charset charset) { Map<String, String> properties = new HashMap<>(); properties.put(StringSchema.CHARSET_KEY, charset.name()); - SchemaInfo si = new SchemaInfo() + SchemaInfo si = new SchemaInfoImpl() .setName("test-schema-info-without-charset") .setType(SchemaType.STRING) .setSchema(new byte[0]) diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index fd29ce7..05028a0 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -183,6 +183,10 @@ <version>1.1.7.5</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> </dependencies> diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java index 0f0429e..c58fef3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java @@ -25,7 +25,6 @@ import java.util.TreeMap; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import lombok.Value; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.AuthPolicies; @@ -47,15 +46,17 @@ public final class AuthPoliciesImpl implements AuthPolicies { return new AuthPoliciesImplBuilder(); } - private static class AuthPoliciesImplBuilder implements AuthPolicies.Builder { + + public static class AuthPoliciesImplBuilder implements AuthPolicies.Builder { private Map<String, Set<AuthAction>> namespaceAuthentication = new TreeMap<>(); - private Map<String, Map<String, Set<AuthAction>>> topicAuthentication = new TreeMap<>(); - private Map<String, Set<String>> subscriptionAuthentication = new TreeMap<>(); + private Map<String, Map<String, Set<AuthAction>>> topicAuthentication = new TreeMap<>();; + private Map<String, Set<String>> subscriptionAuthentication= new TreeMap<>();; AuthPoliciesImplBuilder() { } - public AuthPoliciesImplBuilder namespaceAuthentication(Map<String, Set<AuthAction>> namespaceAuthentication) { + public AuthPoliciesImplBuilder namespaceAuthentication( + Map<String, Set<AuthAction>> namespaceAuthentication) { this.namespaceAuthentication = namespaceAuthentication; return this; } @@ -66,7 +67,8 @@ public final class AuthPoliciesImpl implements AuthPolicies { return this; } - public AuthPoliciesImplBuilder subscriptionAuthentication(Map<String, Set<String>> subscriptionAuthentication) { + public AuthPoliciesImplBuilder subscriptionAuthentication( + Map<String, Set<String>> subscriptionAuthentication) { this.subscriptionAuthentication = subscriptionAuthentication; return this; } @@ -74,5 +76,11 @@ public final class AuthPoliciesImpl implements AuthPolicies { public AuthPoliciesImpl build() { return new AuthPoliciesImpl(namespaceAuthentication, topicAuthentication, subscriptionAuthentication); } + + public String toString() { + return "AuthPoliciesImpl.AuthPoliciesImplBuilder(namespaceAuthentication=" + this.namespaceAuthentication + + ", topicAuthentication=" + this.topicAuthentication + ", subscriptionAuthentication=" + + this.subscriptionAuthentication + ")"; + } } } \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java similarity index 96% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java rename to pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java index 9573526..5f36909 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java @@ -46,7 +46,7 @@ public final class KeyValueSchemaInfo { @Override public SchemaInfo getSchemaInfo() { - return BytesSchema.BYTES.getSchemaInfo(); + return Schema.BYTES.getSchemaInfo(); } @Override @@ -169,11 +169,12 @@ public final class KeyValueSchemaInfo { properties.put(KV_ENCODING_TYPE, String.valueOf(keyValueEncodingType)); // generate the final schema info - return new SchemaInfo() - .setName(schemaName) - .setType(SchemaType.KEY_VALUE) - .setSchema(schemaData) - .setProperties(properties); + return SchemaInfoImpl.builder() + .name(schemaName) + .type(SchemaType.KEY_VALUE) + .schema(schemaData) + .properties(properties) + .build(); } private static void encodeSubSchemaInfoToParentSchemaProperties(SchemaInfo schemaInfo, @@ -237,7 +238,7 @@ public final class KeyValueSchemaInfo { } else { schemaProps = SchemaUtils.deserializeSchemaProperties(schemaPropsStr); } - return SchemaInfo.builder() + return SchemaInfoImpl.builder() .name(schemaName) .type(schemaType) .schema(schemaData) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java similarity index 83% copy from pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java copy to pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java index f2c5860..ca8b6cc 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.common.schema; +package org.apache.pulsar.client.impl.schema; import static java.nio.charset.StandardCharsets.UTF_8; import java.util.Base64; @@ -28,9 +28,11 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; -import org.apache.pulsar.client.internal.DefaultImplementation; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; /** * Information about the schema. @@ -42,7 +44,7 @@ import org.apache.pulsar.common.classification.InterfaceStability; @NoArgsConstructor @Accessors(chain = true) @Builder -public class SchemaInfo { +public class SchemaInfoImpl implements SchemaInfo { @EqualsAndHashCode.Exclude private String name; @@ -75,17 +77,15 @@ public class SchemaInfo { case PROTOBUF_NATIVE: return new String(schema, UTF_8); case KEY_VALUE: - KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue = - DefaultImplementation.decodeKeyValueSchemaInfo(this); - return DefaultImplementation.jsonifyKeyValueSchemaInfo(schemaInfoKeyValue); + KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue = KeyValueSchemaInfo.decodeKeyValueSchemaInfo(this); + return SchemaUtils.jsonifyKeyValueSchemaInfo(schemaInfoKeyValue); default: return Base64.getEncoder().encodeToString(schema); } } @Override - public String toString(){ - return DefaultImplementation.jsonifySchemaInfo(this); + public String toString() { + return SchemaUtils.jsonifySchemaInfo(this); } - } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java similarity index 100% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java rename to pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java index 5c00f06..d5b4405 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import lombok.Builder; import lombok.Data; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -45,7 +46,7 @@ public class SchemaData { * @return the converted schema info. */ public SchemaInfo toSchemaInfo() { - return SchemaInfo.builder() + return SchemaInfoImpl.builder() .name("") .type(type) .schema(data) diff --git a/pulsar-functions/runtime-all/pom.xml b/pulsar-functions/runtime-all/pom.xml index 169985b..bf700783 100644 --- a/pulsar-functions/runtime-all/pom.xml +++ b/pulsar-functions/runtime-all/pom.xml @@ -30,6 +30,19 @@ <relativePath>..</relativePath> </parent> + <!-- + + THIS MODULE SHOULD ONLY CONTAIN THE INTERFACES THAT PULSAR FUNCTION'S FRAMEWORK USES TO INTERACT WITH USER CODE. + THIS MODULE SHOULD ONLY DEPEND ON: + 1. pulsar-io-core + 2. pulsar-functions-api + 3. pulsar-client-api + 4. slf4j-api + 5. log4j-slf4j-impl + 6. log4j-api + 7. log4j-core + --> + <artifactId>pulsar-functions-runtime-all</artifactId> <name>Pulsar Functions :: Runtime All</name> @@ -48,7 +61,7 @@ <dependency> <groupId>${project.groupId}</groupId> - <artifactId>pulsar-client-original</artifactId> + <artifactId>pulsar-client-api</artifactId> <version>${project.version}</version> </dependency> @@ -77,6 +90,19 @@ <build> <plugins> + <!-- + Disable the maven-jar-plugin since we don't need the default jar and it conflicts with the maven-assembly-plugin + --> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <id>default-jar</id> + <phase/> + </execution> + </executions> + </plugin> + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> @@ -92,7 +118,7 @@ <execution> <id>make-assembly</id> <!-- bind to the packaging phase --> - <phase>package</phase> + <phase>compile</phase> <goals> <goal>single</goal> </goals> diff --git a/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java b/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java index bd64bf7..6852792 100644 --- a/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java +++ b/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java @@ -36,6 +36,7 @@ import java.util.List; * This class will create three classloaders: * 1. The root classloader that will share interfaces between the function instance * classloader and user code classloader. This classloader will contain the following dependencies + * - pulsar-io-core * - pulsar-functions-api * - pulsar-client-api * - log4j-slf4j-impl diff --git a/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java new file mode 100644 index 0000000..3bdd23f --- /dev/null +++ b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.functions.instance; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +@Slf4j +/** + * This test serves to make sure that the correct classes are included in the java-instance.jar + * THAT JAR SHOULD ONLY CONTAIN THE INTERFACES THAT PULSAR FUNCTION'S FRAMEWORK USES TO INTERACT WITH USER CODE + * WHICH INCLUDES CLASSES FROM THE FOLLOWING LIBRARIES + * 1. pulsar-io-core + * 2. pulsar-functions-api + * 3. pulsar-client-api + * 4. slf4j-api + * 5. log4j-slf4j-impl + * 6. log4j-api + * 7. log4j-core + */ +public class JavaInstanceDepsTest { + + @Test + public void testInstanceJarDeps() throws IOException { + File jar = new File("target/java-instance.jar"); + + @Cleanup + ZipInputStream zip = new ZipInputStream(jar.toURI().toURL().openStream()); + + List<String> notAllowedClasses = new LinkedList<>(); + while(true) { + ZipEntry e = zip.getNextEntry(); + if (e == null) + break; + String name = e.getName(); + if (name.endsWith(".class") && !name.startsWith("META-INF")) { + // The only classes in the java-instance.jar should be org.apache.pulsar, slf4j, and log4j classes + // filter out those classes to see if there are any other classes that should not be allowed + if (!name.startsWith("org/apache/pulsar") + && !name.startsWith("org/slf4j") + && !name.startsWith("org/apache/logging/slf4j") + && !name.startsWith("org/apache/logging/log4j")) { + notAllowedClasses.add(name); + } + } + } + + Assert.assertEquals(notAllowedClasses, Collections.emptyList(), notAllowedClasses.toString()); + } +} diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java index e0a9c64..474410c 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java @@ -179,7 +179,6 @@ public class ThreadRuntime implements Runtime { String.format("%s-%s", FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()), instanceConfig.getInstanceId())); - this.fnThread.setContextClassLoader(functionClassLoader); this.fnThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java index 2db9d6c..ba57692 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java @@ -27,6 +27,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.storage.Converter; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -44,7 +45,7 @@ public class KafkaSchemaWrappedSchema implements Schema<byte[]>, Serializable { Map<String, String> props = new HashMap<>(); boolean isJsonConverter = converter instanceof JsonConverter; props.put(GenericAvroSchema.OFFSET_PROP, isJsonConverter ? "0" : "5"); - this.schemaInfo = SchemaInfo.builder() + this.schemaInfo = SchemaInfoImpl.builder() .name(isJsonConverter? "KafKaJson" : "KafkaAvro") .type(isJsonConverter ? SchemaType.JSON : SchemaType.AVRO) .schema(schema.toString().getBytes(UTF_8)) diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java index eda8c96..2a9e1c4 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java @@ -25,6 +25,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -65,7 +66,7 @@ final class AvroSchemaCache { org.apache.avro.Schema schema = schemaRegistryClient.getById(schemaId); String definition = schema.toString(false); log.info("Schema {} definition {}", schemaId, definition); - SchemaInfo schemaInfo = SchemaInfo.builder() + SchemaInfo schemaInfo = SchemaInfoImpl.builder() .type(SchemaType.AVRO) .name(schema.getName()) .properties(Collections.emptyMap()) diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java index 81bf031..47bedbe 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.serialization.ShortDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -235,7 +236,7 @@ public class KafkaBytesSource extends KafkaAbstractSource<ByteBuffer> { static final class DeferredSchemaPlaceholder extends ByteBufferSchemaWrapper { DeferredSchemaPlaceholder() { - super(SchemaInfo + super(SchemaInfoImpl .builder() .type(SchemaType.AVRO) .properties(Collections.emptyMap()) diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE index b155e45..3f0bc09 100644 --- a/pulsar-sql/presto-distribution/LICENSE +++ b/pulsar-sql/presto-distribution/LICENSE @@ -450,7 +450,6 @@ The Apache Software License, Version 2.0 * Apache Zookeeper - zookeeper-3.6.3.jar - zookeeper-jute-3.6.3.jar - - zookeeper-prometheus-metrics-3.6.3.jar * Apache Yetus Audience Annotations - audience-annotations-0.5.0.jar * Swagger diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java index 03a6b77..645edbd 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -101,7 +102,7 @@ public class PulsarSplit implements ConnectorSplit { this.offloadPolicies = offloadPolicies; ObjectMapper objectMapper = new ObjectMapper(); - this.schemaInfo = SchemaInfo.builder() + this.schemaInfo = SchemaInfoImpl.builder() .name(originSchemaName) .type(schemaType) .schema(schema.getBytes("ISO8859-1")) diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java index cd31b4b..79fb789 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java @@ -23,6 +23,7 @@ import io.prestosql.spi.PrestoException; import io.prestosql.spi.connector.*; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -188,9 +189,10 @@ public class TestPulsarMetadata extends TestPulsarConnector { @Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true) public void testGetTableMetadataTableBlankSchema(String delimiter) throws PulsarAdminException { updateRewriteNamespaceDelimiterIfNeeded(delimiter); - SchemaInfo badSchemaInfo = new SchemaInfo(); - badSchemaInfo.setSchema(new byte[0]); - badSchemaInfo.setType(SchemaType.AVRO); + SchemaInfo badSchemaInfo = SchemaInfoImpl.builder() + .schema(new byte[0]) + .type(SchemaType.AVRO) + .build(); when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo); PulsarTableHandle pulsarTableHandle = new PulsarTableHandle( @@ -214,9 +216,10 @@ public class TestPulsarMetadata extends TestPulsarConnector { @Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true) public void testGetTableMetadataTableInvalidSchema(String delimiter) throws PulsarAdminException { updateRewriteNamespaceDelimiterIfNeeded(delimiter); - SchemaInfo badSchemaInfo = new SchemaInfo(); - badSchemaInfo.setSchema("foo".getBytes()); - badSchemaInfo.setType(SchemaType.AVRO); + SchemaInfo badSchemaInfo = SchemaInfoImpl.builder() + .schema("foo".getBytes()) + .type(SchemaType.AVRO) + .build(); when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo); PulsarTableHandle pulsarTableHandle = new PulsarTableHandle( diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java index f4810ba..c1b97d3 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java @@ -22,6 +22,7 @@ import io.airlift.slice.Slices; import io.prestosql.decoder.DecoderColumnHandle; import io.prestosql.decoder.FieldValueProvider; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.sql.presto.PulsarColumnHandle; @@ -65,7 +66,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { public void testPrimitiveType() { byte int8Value = 1; - SchemaInfo schemaInfoInt8 = SchemaInfo.builder().type(SchemaType.INT8).build(); + SchemaInfo schemaInfoInt8 = SchemaInfoImpl.builder().type(SchemaType.INT8).build(); Schema schemaInt8 = Schema.getSchema(schemaInfoInt8); List<PulsarColumnHandle> pulsarColumnHandleInt8 = getColumnColumnHandles(topicName, schemaInfoInt8, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); PulsarRowDecoder pulsarRowDecoderInt8 = decoderFactory.createRowDecoder(topicName, schemaInfoInt8, @@ -77,7 +78,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PRIMITIVE_COLUMN_NAME, TINYINT, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int8Value); short int16Value = 2; - SchemaInfo schemaInfoInt16 = SchemaInfo.builder().type(SchemaType.INT16).build(); + SchemaInfo schemaInfoInt16 = SchemaInfoImpl.builder().type(SchemaType.INT16).build(); Schema schemaInt16 = Schema.getSchema(schemaInfoInt16); List<PulsarColumnHandle> pulsarColumnHandleInt16 = getColumnColumnHandles(topicName, schemaInfoInt16, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); PulsarRowDecoder pulsarRowDecoderInt16 = decoderFactory.createRowDecoder(topicName, schemaInfoInt16, @@ -89,7 +90,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PRIMITIVE_COLUMN_NAME, SMALLINT, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int16Value); int int32Value = 2; - SchemaInfo schemaInfoInt32 = SchemaInfo.builder().type(SchemaType.INT32).build(); + SchemaInfo schemaInfoInt32 = SchemaInfoImpl.builder().type(SchemaType.INT32).build(); Schema schemaInt32 = Schema.getSchema(schemaInfoInt32); List<PulsarColumnHandle> pulsarColumnHandleInt32 = getColumnColumnHandles(topicName, schemaInfoInt32, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -102,7 +103,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PRIMITIVE_COLUMN_NAME, INTEGER, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int32Value); long int64Value = 2; - SchemaInfo schemaInfoInt64 = SchemaInfo.builder().type(SchemaType.INT64).build(); + SchemaInfo schemaInfoInt64 = SchemaInfoImpl.builder().type(SchemaType.INT64).build(); Schema schemaInt64 = Schema.getSchema(schemaInfoInt64); List<PulsarColumnHandle> pulsarColumnHandleInt64 = getColumnColumnHandles(topicName, schemaInfoInt64, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -116,7 +117,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PulsarColumnHandle.HandleKeyValueType.NONE), int64Value); String stringValue = "test"; - SchemaInfo schemaInfoString = SchemaInfo.builder().type(SchemaType.STRING).build(); + SchemaInfo schemaInfoString = SchemaInfoImpl.builder().type(SchemaType.STRING).build(); Schema schemaString = Schema.getSchema(schemaInfoString); List<PulsarColumnHandle> pulsarColumnHandleString = getColumnColumnHandles(topicName, schemaInfoString, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -130,7 +131,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PulsarColumnHandle.HandleKeyValueType.NONE), stringValue); float floatValue = 0.2f; - SchemaInfo schemaInfoFloat = SchemaInfo.builder().type(SchemaType.FLOAT).build(); + SchemaInfo schemaInfoFloat = SchemaInfoImpl.builder().type(SchemaType.FLOAT).build(); Schema schemaFloat = Schema.getSchema(schemaInfoFloat); List<PulsarColumnHandle> pulsarColumnHandleFloat = getColumnColumnHandles(topicName, schemaInfoFloat, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -144,7 +145,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PulsarColumnHandle.HandleKeyValueType.NONE), Long.valueOf(Float.floatToIntBits(floatValue))); double doubleValue = 0.22d; - SchemaInfo schemaInfoDouble = SchemaInfo.builder().type(SchemaType.DOUBLE).build(); + SchemaInfo schemaInfoDouble = SchemaInfoImpl.builder().type(SchemaType.DOUBLE).build(); Schema schemaDouble = Schema.getSchema(schemaInfoDouble); List<PulsarColumnHandle> pulsarColumnHandleDouble = getColumnColumnHandles(topicName, schemaInfoDouble, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -158,7 +159,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PulsarColumnHandle.HandleKeyValueType.NONE), doubleValue); boolean booleanValue = true; - SchemaInfo schemaInfoBoolean = SchemaInfo.builder().type(SchemaType.BOOLEAN).build(); + SchemaInfo schemaInfoBoolean = SchemaInfoImpl.builder().type(SchemaType.BOOLEAN).build(); Schema schemaBoolean = Schema.getSchema(schemaInfoBoolean); List<PulsarColumnHandle> pulsarColumnHandleBoolean = getColumnColumnHandles(topicName, schemaInfoBoolean, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -173,7 +174,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { byte[] bytesValue = new byte[1]; bytesValue[0] = 1; - SchemaInfo schemaInfoBytes = SchemaInfo.builder().type(SchemaType.BYTES).build(); + SchemaInfo schemaInfoBytes = SchemaInfoImpl.builder().type(SchemaType.BYTES).build(); Schema schemaBytes = Schema.getSchema(schemaInfoBytes); List<PulsarColumnHandle> pulsarColumnHandleBytes = getColumnColumnHandles(topicName, schemaInfoBytes, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -187,7 +188,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PulsarColumnHandle.HandleKeyValueType.NONE), Slices.wrappedBuffer(bytesValue)); Date dateValue = new Date(System.currentTimeMillis()); - SchemaInfo schemaInfoDate = SchemaInfo.builder().type(SchemaType.DATE).build(); + SchemaInfo schemaInfoDate = SchemaInfoImpl.builder().type(SchemaType.DATE).build(); Schema schemaDate = Schema.getSchema(schemaInfoDate); List<PulsarColumnHandle> pulsarColumnHandleDate = getColumnColumnHandles(topicName, schemaInfoDate, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -201,7 +202,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PulsarColumnHandle.HandleKeyValueType.NONE), dateValue.getTime()); Time timeValue = new Time(System.currentTimeMillis()); - SchemaInfo schemaInfoTime = SchemaInfo.builder().type(SchemaType.TIME).build(); + SchemaInfo schemaInfoTime = SchemaInfoImpl.builder().type(SchemaType.TIME).build(); Schema schemaTime = Schema.getSchema(schemaInfoTime); List<PulsarColumnHandle> pulsarColumnHandleTime = getColumnColumnHandles(topicName, schemaInfoTime, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -215,7 +216,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PulsarColumnHandle.HandleKeyValueType.NONE), timeValue.getTime()); Timestamp timestampValue = new Timestamp(System.currentTimeMillis()); - SchemaInfo schemaInfoTimestamp = SchemaInfo.builder().type(SchemaType.TIMESTAMP).build(); + SchemaInfo schemaInfoTimestamp = SchemaInfoImpl.builder().type(SchemaType.TIMESTAMP).build(); Schema schemaTimestamp = Schema.getSchema(schemaInfoTimestamp); List<PulsarColumnHandle> pulsarColumnHandleTimestamp = getColumnColumnHandles(topicName, schemaInfoTimestamp, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); diff --git a/tests/docker-images/java-test-functions/pom.xml b/tests/docker-images/java-test-functions/pom.xml index bb5190e..140168d 100644 --- a/tests/docker-images/java-test-functions/pom.xml +++ b/tests/docker-images/java-test-functions/pom.xml @@ -36,7 +36,7 @@ </dependency> <dependency> <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-client-original</artifactId> + <artifactId>pulsar-client</artifactId> <version>${project.version}</version> </dependency> </dependencies> @@ -82,24 +82,10 @@ </transformers> <artifactSet> <includes> - <include>org.apache.pulsar:pulsar-client-original</include> - <include>org.apache.pulsar:pulsar-client-api</include> - <include>org.apache.pulsar:pulsar-client-admin-api</include> + <include>org.apache.pulsar:pulsar-client</include> <include>org.apache.pulsar:pulsar-functions-api-examples</include> </includes> </artifactSet> - <filters> - <filter> - <artifact>org.apache.pulsar:pulsar-client-original</artifact> - <includes> - <include>**</include> - </includes> - <excludes> - <!-- bouncycastle jars could not be shaded, or the signatures will be wrong--> - <exclude>org/bouncycastle/**</exclude> - </excludes> - </filter> - </filters> </configuration> </execution> </executions> diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java index e0cc328..3c961fa 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java @@ -45,6 +45,7 @@ import org.apache.pulsar.tests.integration.docker.ContainerExecException; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.testng.Assert; import org.testng.annotations.Test; /** @@ -59,41 +60,44 @@ public class GenericRecordSourceTest extends PulsarStandaloneTestSuite { String outputTopicName = "test-state-source-output-" + randomName(8); String sourceName = "test-state-source-" + randomName(8); int numMessages = 10; + try { + submitSourceConnector( + sourceName, + outputTopicName, + "org.apache.pulsar.tests.integration.io.GenericRecordSource", JAVAJAR); - submitSourceConnector( - sourceName, - outputTopicName, - "org.apache.pulsar.tests.integration.io.GenericRecordSource", JAVAJAR); - - // get source info - getSourceInfoSuccess(container, sourceName); + // get source info + getSourceInfoSuccess(container, sourceName); - // get source status - getSourceStatus(container, sourceName); + // get source status + getSourceStatus(container, sourceName); - try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) { + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) { - retryStrategically((test) -> { - try { - SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName); - return status.getInstances().size() > 0 - && status.getInstances().get(0).getStatus().numWritten >= 10; - } catch (PulsarAdminException e) { - return false; - } - }, 10, 200); + retryStrategically((test) -> { + try { + SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName); + return status.getInstances().size() > 0 + && status.getInstances().get(0).getStatus().numWritten >= 10; + } catch (PulsarAdminException e) { + return false; + } + }, 10, 200); - SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName); - assertEquals(status.getInstances().size(), 1); - assertTrue(status.getInstances().get(0).getStatus().numWritten >= 10); - } + SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName); + assertEquals(status.getInstances().size(), 1); + assertTrue(status.getInstances().get(0).getStatus().numWritten >= 10); + } - consumeMessages(container, outputTopicName, numMessages); + consumeMessages(container, outputTopicName, numMessages); - // delete source - deleteSource(container, sourceName); + // delete source + deleteSource(container, sourceName); - getSourceInfoNotFound(container, sourceName); + getSourceInfoNotFound(container, sourceName); + } finally { + dumpFunctionLogs(sourceName); + } } @@ -129,15 +133,35 @@ public class GenericRecordSourceTest extends PulsarStandaloneTestSuite { } private static void getSourceStatus(StandaloneContainer container,String sourceName) throws Exception { + retryStrategically((test) -> { + try { + ContainerExecResult result = container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "sources", + "status", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName); + + if (result.getStdout().contains("\"running\" : true")) { + return true; + } + return false; + } catch (Exception e) { + log.error("Encountered error when getting source status", e); + return false; + } + }, 10, 200); + ContainerExecResult result = container.execCmd( - PulsarCluster.ADMIN_SCRIPT, - "sources", - "status", - "--tenant", "public", - "--namespace", "default", - "--name", sourceName - ); - assertTrue(result.getStdout().contains("\"running\" : true")); + PulsarCluster.ADMIN_SCRIPT, + "sources", + "status", + "--tenant", "public", + "--namespace", "default", + "--name", sourceName); + + Assert.assertTrue(result.getStdout().contains("\"running\" : true")); } private static void consumeMessages(StandaloneContainer container, String outputTopic,
