This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4939204baad8241c3fb1a92d35d3ac7650754e04 Author: Enrico Olivelli <[email protected]> AuthorDate: Tue Nov 9 14:31:33 2021 +0100 Pulsar Client: restore SchemaInfo.builder() API (#12673) (cherry picked from commit 849e4dc5fa59b774287436a054efb17d198054b4) --- .../admin/AdminApiSchemaValidationEnforced.java | 8 ++-- .../schema/JsonSchemaCompatibilityCheckTest.java | 3 +- .../SchemaCompatibilityCheckTest.java | 5 +-- .../pulsar/client/admin/internal/SchemasImpl.java | 3 +- .../PulsarClientImplementationBinding.java | 2 + .../apache/pulsar/common/schema/SchemaInfo.java | 48 ++++++++++++++++++++++ .../PulsarClientImplementationBindingImpl.java | 34 +++------------ .../pulsar/client/impl/schema/SchemaInfoTest.java | 6 +-- .../pulsar/common/protocol/schema/SchemaData.java | 3 +- .../connect/schema/KafkaSchemaWrappedSchema.java | 3 +- .../apache/pulsar/io/kafka/AvroSchemaCache.java | 3 +- .../org/apache/pulsar/sql/presto/PulsarSplit.java | 3 +- .../pulsar/sql/presto/TestPulsarMetadata.java | 4 +- .../decoder/primitive/TestPrimitiveDecoder.java | 24 +++++------ 14 files changed, 82 insertions(+), 67 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java index 3daf920..b7747de 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java @@ -30,9 +30,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.protocol.schema.PostSchemaPayload; import org.apache.pulsar.common.schema.SchemaInfo; @@ -98,7 +96,7 @@ public class AdminApiSchemaValidationEnforced extends MockedPulsarServiceBaseTes assertTrue(e.getMessage().contains("HTTP 404 Not Found")); } Map<String, String> properties = Maps.newHashMap(); - SchemaInfo schemaInfo = SchemaInfoImpl.builder() + SchemaInfo schemaInfo = SchemaInfo.builder() .type(SchemaType.STRING) .properties(properties) .name("test") @@ -147,7 +145,7 @@ public class AdminApiSchemaValidationEnforced extends MockedPulsarServiceBaseTes } Map<String, String> properties = Maps.newHashMap(); properties.put("key1", "value1"); - SchemaInfo schemaInfo = SchemaInfoImpl.builder() + SchemaInfo schemaInfo = SchemaInfo.builder() .type(SchemaType.STRING) .properties(properties) .name("test") @@ -177,7 +175,7 @@ public class AdminApiSchemaValidationEnforced extends MockedPulsarServiceBaseTes } admin.namespaces().setSchemaValidationEnforced(namespace,true); Map<String, String> properties = Maps.newHashMap(); - SchemaInfo schemaInfo = SchemaInfoImpl.builder() + SchemaInfo schemaInfo = SchemaInfo.builder() .type(SchemaType.STRING) .properties(properties) .name("test") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java index 32a9f9e..9bf0189 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java @@ -33,7 +33,6 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.JSONSchema; -import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaInfo; @@ -119,7 +118,7 @@ public class JsonSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilit JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(mapper); JsonSchema schema = schemaGen.generateSchema(pojo); - SchemaInfo info = SchemaInfoImpl.builder() + SchemaInfo info = SchemaInfo.builder() .name("") .properties(properties) .type(SchemaType.JSON) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java index 8def5dc..02913c6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java @@ -35,15 +35,12 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaDefinition; -import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.schema.Schemas; @@ -324,7 +321,7 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest { SchemaCompatibilityStrategy.FULL); byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class) .getSchemaInfo().getSchema(), UTF_8) + "/n /n /n").getBytes(); - SchemaInfo schemaInfo = SchemaInfoImpl.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build(); + SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build(); admin.schemas().createSchema(fqtn, schemaInfo); admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java index a072acd..54e731e 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java @@ -32,7 +32,6 @@ import javax.ws.rs.client.WebTarget; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Schemas; import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.internal.DefaultImplementation; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse; @@ -449,7 +448,7 @@ public class SchemasImpl extends BaseResource implements Schemas { schema = response.getData().getBytes(UTF_8); } - return SchemaInfoImpl.builder() + return SchemaInfo.builder() .schema(schema) .type(response.getType()) .properties(response.getProperties()) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java index f7bcf05..c8c300c 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java @@ -251,4 +251,6 @@ public interface PulsarClientImplementationBinding { byteBuffer.get(array); return array; } + + SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, Map<String, String> propertiesValue); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java index 01ba746..e05b0d2 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java @@ -18,8 +18,10 @@ */ package org.apache.pulsar.common.schema; +import java.util.Collections; import java.util.Map; +import org.apache.pulsar.client.internal.DefaultImplementation; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -48,4 +50,50 @@ public interface SchemaInfo { Map<String, String> getProperties(); String getSchemaDefinition(); + + static SchemaInfoBuilder builder() { + return new SchemaInfoBuilder(); + } + + class SchemaInfoBuilder { + private String name; + private byte[] schema; + private SchemaType type; + private Map<String, String> properties; + private boolean propertiesSet; + + SchemaInfoBuilder() { + } + + public SchemaInfoBuilder name(String name) { + this.name = name; + return this; + } + + public SchemaInfoBuilder schema(byte[] schema) { + this.schema = schema; + return this; + } + + public SchemaInfoBuilder type(SchemaType type) { + this.type = type; + return this; + } + + public SchemaInfoBuilder properties(Map<String, String> properties) { + this.properties = properties; + this.propertiesSet = true; + return this; + } + + public SchemaInfo build() { + Map<String, String> propertiesValue = this.properties; + if (!this.propertiesSet) { + propertiesValue = Collections.emptyMap(); + } + return DefaultImplementation + .getDefaultImplementation() + .newSchemaInfoImpl(name, schema, type, propertiesValue); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java index c146f23..c7f3af9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java @@ -45,35 +45,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.api.schema.SchemaDefinitionBuilder; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.client.impl.auth.AuthenticationToken; -import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; -import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema; -import org.apache.pulsar.client.impl.schema.AvroSchema; -import org.apache.pulsar.client.impl.schema.BooleanSchema; -import org.apache.pulsar.client.impl.schema.ByteBufferSchema; -import org.apache.pulsar.client.impl.schema.ByteSchema; -import org.apache.pulsar.client.impl.schema.BytesSchema; -import org.apache.pulsar.client.impl.schema.DateSchema; -import org.apache.pulsar.client.impl.schema.DoubleSchema; -import org.apache.pulsar.client.impl.schema.FloatSchema; -import org.apache.pulsar.client.impl.schema.InstantSchema; -import org.apache.pulsar.client.impl.schema.IntSchema; -import org.apache.pulsar.client.impl.schema.JSONSchema; -import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; -import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; -import org.apache.pulsar.client.impl.schema.LocalDateSchema; -import org.apache.pulsar.client.impl.schema.LocalDateTimeSchema; -import org.apache.pulsar.client.impl.schema.LocalTimeSchema; -import org.apache.pulsar.client.impl.schema.LongSchema; -import org.apache.pulsar.client.impl.schema.NativeAvroBytesSchema; -import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema; -import org.apache.pulsar.client.impl.schema.ProtobufSchema; -import org.apache.pulsar.client.impl.schema.RecordSchemaBuilderImpl; -import org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl; -import org.apache.pulsar.client.impl.schema.SchemaUtils; -import org.apache.pulsar.client.impl.schema.ShortSchema; -import org.apache.pulsar.client.impl.schema.StringSchema; -import org.apache.pulsar.client.impl.schema.TimeSchema; -import org.apache.pulsar.client.impl.schema.TimestampSchema; +import org.apache.pulsar.client.impl.schema.*; import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema; import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl; import org.apache.pulsar.client.internal.PulsarClientImplementationBinding; @@ -383,4 +355,8 @@ public final class PulsarClientImplementationBindingImpl implements PulsarClient public MessagePayloadFactory newDefaultMessagePayloadFactory() { return new MessagePayloadFactoryImpl(); } + + public SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, Map<String, String> propertiesValue) { + return new SchemaInfoImpl(name, schema, type, propertiesValue); + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java index f96e84e..719704c 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java @@ -289,7 +289,7 @@ public class SchemaInfoTest { @Test public void testUnsetProperties() { - final SchemaInfo schemaInfo = SchemaInfoImpl.builder() + final SchemaInfo schemaInfo = SchemaInfo.builder() .type(SchemaType.STRING) .schema(new byte[0]) .name("string") @@ -305,7 +305,7 @@ public class SchemaInfoTest { public void testSetProperties() { final Map<String, String> map = Maps.newHashMap(); map.put("test", "value"); - final SchemaInfo schemaInfo = SchemaInfoImpl.builder() + final SchemaInfo schemaInfo = SchemaInfo.builder() .type(SchemaType.STRING) .schema(new byte[0]) .name("string") @@ -323,7 +323,7 @@ public class SchemaInfoTest { final Map<String, String> map = new HashMap<>(); map.put("key", null); - SchemaInfo si = SchemaInfoImpl.builder() + SchemaInfo si = SchemaInfo.builder() .name("INT32") .schema(new byte[0]) .type(SchemaType.INT32) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java index d5b4405..5c00f06 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java @@ -22,7 +22,6 @@ import java.util.HashMap; import java.util.Map; import lombok.Builder; import lombok.Data; -import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -46,7 +45,7 @@ public class SchemaData { * @return the converted schema info. */ public SchemaInfo toSchemaInfo() { - return SchemaInfoImpl.builder() + return SchemaInfo.builder() .name("") .type(type) .schema(data) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java index ba57692..2db9d6c 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java @@ -27,7 +27,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.storage.Converter; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -45,7 +44,7 @@ public class KafkaSchemaWrappedSchema implements Schema<byte[]>, Serializable { Map<String, String> props = new HashMap<>(); boolean isJsonConverter = converter instanceof JsonConverter; props.put(GenericAvroSchema.OFFSET_PROP, isJsonConverter ? "0" : "5"); - this.schemaInfo = SchemaInfoImpl.builder() + this.schemaInfo = SchemaInfo.builder() .name(isJsonConverter? "KafKaJson" : "KafkaAvro") .type(isJsonConverter ? SchemaType.JSON : SchemaType.AVRO) .schema(schema.toString().getBytes(UTF_8)) diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java index 2a9e1c4..eda8c96 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java @@ -25,7 +25,6 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -66,7 +65,7 @@ final class AvroSchemaCache { org.apache.avro.Schema schema = schemaRegistryClient.getById(schemaId); String definition = schema.toString(false); log.info("Schema {} definition {}", schemaId, definition); - SchemaInfo schemaInfo = SchemaInfoImpl.builder() + SchemaInfo schemaInfo = SchemaInfo.builder() .type(SchemaType.AVRO) .name(schema.getName()) .properties(Collections.emptyMap()) diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java index 645edbd..03a6b77 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java @@ -32,7 +32,6 @@ import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.bookkeeper.mledger.impl.PositionImpl; -import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; @@ -102,7 +101,7 @@ public class PulsarSplit implements ConnectorSplit { this.offloadPolicies = offloadPolicies; ObjectMapper objectMapper = new ObjectMapper(); - this.schemaInfo = SchemaInfoImpl.builder() + this.schemaInfo = SchemaInfo.builder() .name(originSchemaName) .type(schemaType) .schema(schema.getBytes("ISO8859-1")) diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java index 79fb789..26199ba 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java @@ -189,7 +189,7 @@ public class TestPulsarMetadata extends TestPulsarConnector { @Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true) public void testGetTableMetadataTableBlankSchema(String delimiter) throws PulsarAdminException { updateRewriteNamespaceDelimiterIfNeeded(delimiter); - SchemaInfo badSchemaInfo = SchemaInfoImpl.builder() + SchemaInfo badSchemaInfo = SchemaInfo.builder() .schema(new byte[0]) .type(SchemaType.AVRO) .build(); @@ -216,7 +216,7 @@ public class TestPulsarMetadata extends TestPulsarConnector { @Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true) public void testGetTableMetadataTableInvalidSchema(String delimiter) throws PulsarAdminException { updateRewriteNamespaceDelimiterIfNeeded(delimiter); - SchemaInfo badSchemaInfo = SchemaInfoImpl.builder() + SchemaInfo badSchemaInfo = SchemaInfo.builder() .schema("foo".getBytes()) .type(SchemaType.AVRO) .build(); diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java index c1b97d3..d01210b 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java @@ -66,7 +66,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { public void testPrimitiveType() { byte int8Value = 1; - SchemaInfo schemaInfoInt8 = SchemaInfoImpl.builder().type(SchemaType.INT8).build(); + SchemaInfo schemaInfoInt8 = SchemaInfo.builder().type(SchemaType.INT8).build(); Schema schemaInt8 = Schema.getSchema(schemaInfoInt8); List<PulsarColumnHandle> pulsarColumnHandleInt8 = getColumnColumnHandles(topicName, schemaInfoInt8, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); PulsarRowDecoder pulsarRowDecoderInt8 = decoderFactory.createRowDecoder(topicName, schemaInfoInt8, @@ -78,7 +78,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PRIMITIVE_COLUMN_NAME, TINYINT, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int8Value); short int16Value = 2; - SchemaInfo schemaInfoInt16 = SchemaInfoImpl.builder().type(SchemaType.INT16).build(); + SchemaInfo schemaInfoInt16 = SchemaInfo.builder().type(SchemaType.INT16).build(); Schema schemaInt16 = Schema.getSchema(schemaInfoInt16); List<PulsarColumnHandle> pulsarColumnHandleInt16 = getColumnColumnHandles(topicName, schemaInfoInt16, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); PulsarRowDecoder pulsarRowDecoderInt16 = decoderFactory.createRowDecoder(topicName, schemaInfoInt16, @@ -90,7 +90,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PRIMITIVE_COLUMN_NAME, SMALLINT, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int16Value); int int32Value = 2; - SchemaInfo schemaInfoInt32 = SchemaInfoImpl.builder().type(SchemaType.INT32).build(); + SchemaInfo schemaInfoInt32 = SchemaInfo.builder().type(SchemaType.INT32).build(); Schema schemaInt32 = Schema.getSchema(schemaInfoInt32); List<PulsarColumnHandle> pulsarColumnHandleInt32 = getColumnColumnHandles(topicName, schemaInfoInt32, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -103,7 +103,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PRIMITIVE_COLUMN_NAME, INTEGER, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int32Value); long int64Value = 2; - SchemaInfo schemaInfoInt64 = SchemaInfoImpl.builder().type(SchemaType.INT64).build(); + SchemaInfo schemaInfoInt64 = SchemaInfo.builder().type(SchemaType.INT64).build(); Schema schemaInt64 = Schema.getSchema(schemaInfoInt64); List<PulsarColumnHandle> pulsarColumnHandleInt64 = getColumnColumnHandles(topicName, schemaInfoInt64, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -117,7 +117,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PulsarColumnHandle.HandleKeyValueType.NONE), int64Value); String stringValue = "test"; - SchemaInfo schemaInfoString = SchemaInfoImpl.builder().type(SchemaType.STRING).build(); + SchemaInfo schemaInfoString = SchemaInfo.builder().type(SchemaType.STRING).build(); Schema schemaString = Schema.getSchema(schemaInfoString); List<PulsarColumnHandle> pulsarColumnHandleString = getColumnColumnHandles(topicName, schemaInfoString, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -131,7 +131,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PulsarColumnHandle.HandleKeyValueType.NONE), stringValue); float floatValue = 0.2f; - SchemaInfo schemaInfoFloat = SchemaInfoImpl.builder().type(SchemaType.FLOAT).build(); + SchemaInfo schemaInfoFloat = SchemaInfo.builder().type(SchemaType.FLOAT).build(); Schema schemaFloat = Schema.getSchema(schemaInfoFloat); List<PulsarColumnHandle> pulsarColumnHandleFloat = getColumnColumnHandles(topicName, schemaInfoFloat, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -145,7 +145,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PulsarColumnHandle.HandleKeyValueType.NONE), Long.valueOf(Float.floatToIntBits(floatValue))); double doubleValue = 0.22d; - SchemaInfo schemaInfoDouble = SchemaInfoImpl.builder().type(SchemaType.DOUBLE).build(); + SchemaInfo schemaInfoDouble = SchemaInfo.builder().type(SchemaType.DOUBLE).build(); Schema schemaDouble = Schema.getSchema(schemaInfoDouble); List<PulsarColumnHandle> pulsarColumnHandleDouble = getColumnColumnHandles(topicName, schemaInfoDouble, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -159,7 +159,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PulsarColumnHandle.HandleKeyValueType.NONE), doubleValue); boolean booleanValue = true; - SchemaInfo schemaInfoBoolean = SchemaInfoImpl.builder().type(SchemaType.BOOLEAN).build(); + SchemaInfo schemaInfoBoolean = SchemaInfo.builder().type(SchemaType.BOOLEAN).build(); Schema schemaBoolean = Schema.getSchema(schemaInfoBoolean); List<PulsarColumnHandle> pulsarColumnHandleBoolean = getColumnColumnHandles(topicName, schemaInfoBoolean, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -174,7 +174,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { byte[] bytesValue = new byte[1]; bytesValue[0] = 1; - SchemaInfo schemaInfoBytes = SchemaInfoImpl.builder().type(SchemaType.BYTES).build(); + SchemaInfo schemaInfoBytes = SchemaInfo.builder().type(SchemaType.BYTES).build(); Schema schemaBytes = Schema.getSchema(schemaInfoBytes); List<PulsarColumnHandle> pulsarColumnHandleBytes = getColumnColumnHandles(topicName, schemaInfoBytes, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -188,7 +188,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PulsarColumnHandle.HandleKeyValueType.NONE), Slices.wrappedBuffer(bytesValue)); Date dateValue = new Date(System.currentTimeMillis()); - SchemaInfo schemaInfoDate = SchemaInfoImpl.builder().type(SchemaType.DATE).build(); + SchemaInfo schemaInfoDate = SchemaInfo.builder().type(SchemaType.DATE).build(); Schema schemaDate = Schema.getSchema(schemaInfoDate); List<PulsarColumnHandle> pulsarColumnHandleDate = getColumnColumnHandles(topicName, schemaInfoDate, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -202,7 +202,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PulsarColumnHandle.HandleKeyValueType.NONE), dateValue.getTime()); Time timeValue = new Time(System.currentTimeMillis()); - SchemaInfo schemaInfoTime = SchemaInfoImpl.builder().type(SchemaType.TIME).build(); + SchemaInfo schemaInfoTime = SchemaInfo.builder().type(SchemaType.TIME).build(); Schema schemaTime = Schema.getSchema(schemaInfoTime); List<PulsarColumnHandle> pulsarColumnHandleTime = getColumnColumnHandles(topicName, schemaInfoTime, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory); @@ -216,7 +216,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester { PulsarColumnHandle.HandleKeyValueType.NONE), timeValue.getTime()); Timestamp timestampValue = new Timestamp(System.currentTimeMillis()); - SchemaInfo schemaInfoTimestamp = SchemaInfoImpl.builder().type(SchemaType.TIMESTAMP).build(); + SchemaInfo schemaInfoTimestamp = SchemaInfo.builder().type(SchemaType.TIMESTAMP).build(); Schema schemaTimestamp = Schema.getSchema(schemaInfoTimestamp); List<PulsarColumnHandle> pulsarColumnHandleTimestamp = getColumnColumnHandles(topicName, schemaInfoTimestamp, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
