This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/v3.0 by this push:
     new 985e1b3  [FLINK-32003] Upgrade pulsar-client version to work with 
OAuth2 (#44)
985e1b3 is described below

commit 985e1b308141768071475424fa6838620a4295f6
Author: tison <[email protected]>
AuthorDate: Mon May 8 11:55:50 2023 +0800

    [FLINK-32003] Upgrade pulsar-client version to work with OAuth2 (#44)
    
    Signed-off-by: tison <[email protected]>
---
 .../connector/pulsar/common/schema/PulsarSchema.java      | 15 ++++++++++++++-
 .../connector/pulsar/common/schema/PulsarSchemaUtils.java |  8 ++++++--
 .../runtime/container/PulsarContainerRuntime.java         |  8 ++++----
 .../src/main/resources/META-INF/NOTICE                    |  8 ++++----
 pom.xml                                                   |  6 +++---
 5 files changed, 31 insertions(+), 14 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
index bb09315..fe5daca 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
@@ -155,6 +155,9 @@ public final class PulsarSchema<T> implements Serializable {
             oos.writeUTF(entry.getKey());
             oos.writeUTF(entry.getValue());
         }
+
+        // Timestamp
+        oos.writeLong(schemaInfo.getTimestamp());
     }
 
     private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
@@ -177,7 +180,17 @@ public final class PulsarSchema<T> implements Serializable 
{
             properties.put(ois.readUTF(), ois.readUTF());
         }
 
-        this.schemaInfo = new SchemaInfoImpl(name, schemaBytes, type, 
properties);
+        // Timestamp
+        long timestamp = ois.readLong();
+
+        this.schemaInfo =
+                SchemaInfoImpl.builder()
+                        .name(name)
+                        .schema(schemaBytes)
+                        .type(type)
+                        .properties(properties)
+                        .timestamp(timestamp)
+                        .build();
         this.schema = createSchema(schemaInfo);
     }
 
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java
index 4b1f7ee..90b4242 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java
@@ -181,8 +181,12 @@ public final class PulsarSchemaUtils {
         Map<String, String> properties = new 
HashMap<>(schemaInfo.getProperties());
         properties.put(CLASS_INFO_PLACEHOLDER, typeClass.getName());
 
-        return new SchemaInfoImpl(
-                schemaInfo.getName(), schemaInfo.getSchema(), 
schemaInfo.getType(), properties);
+        return SchemaInfoImpl.builder()
+                .name(schemaInfo.getName())
+                .schema(schemaInfo.getSchema())
+                .type(schemaInfo.getType())
+                .properties(properties)
+                .build();
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
index eac42f0..56d79d7 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
@@ -20,7 +20,6 @@ package 
org.apache.flink.connector.pulsar.testutils.runtime.container;
 
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
 import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import org.apache.flink.util.DockerImageVersions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +31,6 @@ import org.testcontainers.utility.DockerImageName;
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.flink.util.DockerImageVersions.PULSAR;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.testcontainers.containers.PulsarContainer.BROKER_HTTP_PORT;
@@ -56,9 +54,11 @@ public class PulsarContainerRuntime implements PulsarRuntime 
{
             String.format("http://%s:%d";, PULSAR_INTERNAL_HOSTNAME, 
BROKER_HTTP_PORT);
 
     /**
-     * Create a pulsar container provider by a predefined version, this 
constance {@link
-     * DockerImageVersions#PULSAR} should be bumped after the new pulsar 
release.
+     * Create a pulsar container provider by a predefined version, this 
constant should be bumped
+     * after the new pulsar release.
      */
+    public static final String PULSAR = "apachepulsar/pulsar:2.10.2";
+
     private final PulsarContainer container = new 
PulsarContainer(DockerImageName.parse(PULSAR));
 
     private final AtomicBoolean started = new AtomicBoolean(false);
diff --git a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE 
b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
index 151da33..1aee6f4 100644
--- a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
+++ b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
@@ -6,10 +6,10 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- org.apache.pulsar:bouncy-castle-bc:pkg:2.10.1
-- org.apache.pulsar:pulsar-client-admin-api:2.10.1
-- org.apache.pulsar:pulsar-client-all:2.10.1
-- org.apache.pulsar:pulsar-client-api:2.10.1
+- org.apache.pulsar:bouncy-castle-bc:pkg:2.10.2
+- org.apache.pulsar:pulsar-client-admin-api:2.10.2
+- org.apache.pulsar:pulsar-client-all:2.10.2
+- org.apache.pulsar:pulsar-client-api:2.10.2
 - org.slf4j:jul-to-slf4j:1.7.32
 
 This project bundles the following dependencies under the Bouncy Castle 
license.
diff --git a/pom.xml b/pom.xml
index 2fa2d4c..e34dbce 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,13 +50,13 @@ under the License.
 
     <properties>
         <flink.version>1.16.0</flink.version>
-        <pulsar.version>2.10.1</pulsar.version>
+        <pulsar.version>2.10.2</pulsar.version>
 
         <jackson-bom.version>2.13.4.20221013</jackson-bom.version>
         <grpc-bom.version>1.45.1</grpc-bom.version>
         <bouncycastle.version>1.69</bouncycastle.version>
         <google.auth.version>1.4.0</google.auth.version>
-        <jetty.version>9.4.44.v20210927</jetty.version>
+        <jetty.version>9.4.48.v20220622</jetty.version>
         <simpleclient.version>0.8.1</simpleclient.version>
         <junit4.version>4.13.2</junit4.version>
         <junit5.version>5.8.1</junit5.version>
@@ -689,4 +689,4 @@ under the License.
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>

Reply via email to