This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f15ca986811 Use ByteBuffer instead of BytesString which is unsupported
in Schema Coders (#31746)
f15ca986811 is described below
commit f15ca98681128cb2a32eacc3a3d1691587d2eabf
Author: Bartosz Zablocki <[email protected]>
AuthorDate: Wed Jul 3 19:58:43 2024 +0200
Use ByteBuffer instead of BytesString which is unsupported in Schema Coders
(#31746)
---
sdks/java/io/solace/build.gradle | 1 -
.../java/org/apache/beam/sdk/io/solace/data/Solace.java | 15 +++++++--------
.../apache/beam/sdk/io/solace/data/SolaceDataUtils.java | 5 ++---
3 files changed, 9 insertions(+), 12 deletions(-)
diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle
index 11737750164..7c643dc9127 100644
--- a/sdks/java/io/solace/build.gradle
+++ b/sdks/java/io/solace/build.gradle
@@ -38,7 +38,6 @@ dependencies {
implementation library.java.google_cloud_core
implementation library.java.vendored_guava_32_1_2_jre
implementation project(":sdks:java:extensions:avro")
- implementation library.java.vendored_grpc_1_60_1
implementation library.java.avro
permitUnusedDeclared library.java.avro
implementation library.java.google_api_common
diff --git
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
index 97d688bff23..18fee918444 100644
---
a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
+++
b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java
@@ -21,9 +21,9 @@ import com.google.auto.value.AutoValue;
import com.solacesystems.jcsmp.BytesXMLMessage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -127,7 +127,7 @@ public class Solace {
*
* @return The message payload.
*/
- public abstract ByteString getPayload();
+ public abstract ByteBuffer getPayload();
/**
* Gets the destination (topic or queue) to which the message was sent.
*
@@ -234,7 +234,7 @@ public class Solace {
*
* @return The attachment data, or an empty ByteString if no attachment is
present.
*/
- public abstract ByteString getAttachmentBytes();
+ public abstract ByteBuffer getAttachmentBytes();
static Builder builder() {
return new AutoValue_Solace_Record.Builder();
@@ -244,7 +244,7 @@ public class Solace {
abstract static class Builder {
abstract Builder setMessageId(@Nullable String messageId);
- abstract Builder setPayload(ByteString payload);
+ abstract Builder setPayload(ByteBuffer payload);
abstract Builder setDestination(@Nullable Destination destination);
@@ -266,7 +266,7 @@ public class Solace {
abstract Builder setReplicationGroupMessageId(@Nullable String
replicationGroupMessageId);
- abstract Builder setAttachmentBytes(ByteString attachmentBytes);
+ abstract Builder setAttachmentBytes(ByteBuffer attachmentBytes);
abstract Record build();
}
@@ -313,10 +313,9 @@ public class Solace {
Destination replyTo = getDestination(msg.getCorrelationId(),
msg.getReplyTo());
Destination destination = getDestination(msg.getCorrelationId(),
msg.getDestination());
-
return Record.builder()
.setMessageId(msg.getApplicationMessageId())
- .setPayload(ByteString.copyFrom(payloadBytesStream.toByteArray()))
+ .setPayload(ByteBuffer.wrap(payloadBytesStream.toByteArray()))
.setDestination(destination)
.setExpiration(msg.getExpiration())
.setPriority(msg.getPriority())
@@ -330,7 +329,7 @@ public class Solace {
msg.getReplicationGroupMessageId() != null
? msg.getReplicationGroupMessageId().toString()
: null)
-
.setAttachmentBytes(ByteString.copyFrom(attachmentBytesStream.toByteArray()))
+
.setAttachmentBytes(ByteBuffer.wrap(attachmentBytesStream.toByteArray()))
.build();
}
diff --git
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java
index 2e953150c6d..5134bd131d7 100644
---
a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java
+++
b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java
@@ -38,7 +38,6 @@ import java.util.Objects;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.checkerframework.checker.nullness.qual.Nullable;
public class SolaceDataUtils {
@@ -101,7 +100,7 @@ public class SolaceDataUtils {
: DEFAULT_REPLICATION_GROUP_ID.toString();
return Solace.Record.builder()
- .setPayload(ByteString.copyFrom(payload, StandardCharsets.UTF_8))
+ .setPayload(ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8)))
.setMessageId(messageId)
.setDestination(
Solace.Destination.builder()
@@ -117,7 +116,7 @@ public class SolaceDataUtils {
.setTimeToLive(1000L)
.setSenderTimestamp(null)
.setReplicationGroupMessageId(replicationGroupMessageIdString)
- .setAttachmentBytes(ByteString.EMPTY)
+ .setAttachmentBytes(ByteBuffer.wrap(new byte[0]))
.build();
}