This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
The following commit(s) were added to refs/heads/v3.0 by this push:
new 985e1b3 [FLINK-32003] Upgrade pulsar-client version to work with
OAuth2 (#44)
985e1b3 is described below
commit 985e1b308141768071475424fa6838620a4295f6
Author: tison <[email protected]>
AuthorDate: Mon May 8 11:55:50 2023 +0800
[FLINK-32003] Upgrade pulsar-client version to work with OAuth2 (#44)
Signed-off-by: tison <[email protected]>
---
.../connector/pulsar/common/schema/PulsarSchema.java | 15 ++++++++++++++-
.../connector/pulsar/common/schema/PulsarSchemaUtils.java | 8 ++++++--
.../runtime/container/PulsarContainerRuntime.java | 8 ++++----
.../src/main/resources/META-INF/NOTICE | 8 ++++----
pom.xml | 6 +++---
5 files changed, 31 insertions(+), 14 deletions(-)
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
index bb09315..fe5daca 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
@@ -155,6 +155,9 @@ public final class PulsarSchema<T> implements Serializable {
oos.writeUTF(entry.getKey());
oos.writeUTF(entry.getValue());
}
+
+ // Timestamp
+ oos.writeLong(schemaInfo.getTimestamp());
}
private void readObject(ObjectInputStream ois) throws
ClassNotFoundException, IOException {
@@ -177,7 +180,17 @@ public final class PulsarSchema<T> implements Serializable
{
properties.put(ois.readUTF(), ois.readUTF());
}
- this.schemaInfo = new SchemaInfoImpl(name, schemaBytes, type,
properties);
+ // Timestamp
+ long timestamp = ois.readLong();
+
+ this.schemaInfo =
+ SchemaInfoImpl.builder()
+ .name(name)
+ .schema(schemaBytes)
+ .type(type)
+ .properties(properties)
+ .timestamp(timestamp)
+ .build();
this.schema = createSchema(schemaInfo);
}
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java
index 4b1f7ee..90b4242 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaUtils.java
@@ -181,8 +181,12 @@ public final class PulsarSchemaUtils {
Map<String, String> properties = new
HashMap<>(schemaInfo.getProperties());
properties.put(CLASS_INFO_PLACEHOLDER, typeClass.getName());
- return new SchemaInfoImpl(
- schemaInfo.getName(), schemaInfo.getSchema(),
schemaInfo.getType(), properties);
+ return SchemaInfoImpl.builder()
+ .name(schemaInfo.getName())
+ .schema(schemaInfo.getSchema())
+ .type(schemaInfo.getType())
+ .properties(properties)
+ .build();
}
@SuppressWarnings("unchecked")
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
index eac42f0..56d79d7 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
@@ -20,7 +20,6 @@ package
org.apache.flink.connector.pulsar.testutils.runtime.container;
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
import
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import org.apache.flink.util.DockerImageVersions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +31,6 @@ import org.testcontainers.utility.DockerImageName;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.flink.util.DockerImageVersions.PULSAR;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.testcontainers.containers.PulsarContainer.BROKER_HTTP_PORT;
@@ -56,9 +54,11 @@ public class PulsarContainerRuntime implements PulsarRuntime
{
String.format("http://%s:%d", PULSAR_INTERNAL_HOSTNAME,
BROKER_HTTP_PORT);
/**
- * Create a pulsar container provider by a predefined version, this
constance {@link
- * DockerImageVersions#PULSAR} should be bumped after the new pulsar
release.
+ * Create a pulsar container provider by a predefined version, this
constant should be bumped
+ * after the new pulsar release.
*/
+ public static final String PULSAR = "apachepulsar/pulsar:2.10.2";
+
private final PulsarContainer container = new
PulsarContainer(DockerImageName.parse(PULSAR));
private final AtomicBoolean started = new AtomicBoolean(false);
diff --git a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
index 151da33..1aee6f4 100644
--- a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
+++ b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
@@ -6,10 +6,10 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
-- org.apache.pulsar:bouncy-castle-bc:pkg:2.10.1
-- org.apache.pulsar:pulsar-client-admin-api:2.10.1
-- org.apache.pulsar:pulsar-client-all:2.10.1
-- org.apache.pulsar:pulsar-client-api:2.10.1
+- org.apache.pulsar:bouncy-castle-bc:pkg:2.10.2
+- org.apache.pulsar:pulsar-client-admin-api:2.10.2
+- org.apache.pulsar:pulsar-client-all:2.10.2
+- org.apache.pulsar:pulsar-client-api:2.10.2
- org.slf4j:jul-to-slf4j:1.7.32
This project bundles the following dependencies under the Bouncy Castle
license.
diff --git a/pom.xml b/pom.xml
index 2fa2d4c..e34dbce 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,13 +50,13 @@ under the License.
<properties>
<flink.version>1.16.0</flink.version>
- <pulsar.version>2.10.1</pulsar.version>
+ <pulsar.version>2.10.2</pulsar.version>
<jackson-bom.version>2.13.4.20221013</jackson-bom.version>
<grpc-bom.version>1.45.1</grpc-bom.version>
<bouncycastle.version>1.69</bouncycastle.version>
<google.auth.version>1.4.0</google.auth.version>
- <jetty.version>9.4.44.v20210927</jetty.version>
+ <jetty.version>9.4.48.v20220622</jetty.version>
<simpleclient.version>0.8.1</simpleclient.version>
<junit4.version>4.13.2</junit4.version>
<junit5.version>5.8.1</junit5.version>
@@ -689,4 +689,4 @@ under the License.
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>