This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 36973beb8081c1005964dc2c082ebe41d0064b43 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Thu Jan 28 14:05:36 2021 +0800 [FLINK-21154] Rename packages for request-reply protocol Proto messages --- .../statefun/flink/core/common/PolyglotUtil.java | 2 +- .../flink/core/httpfn/HttpRequestReplyClient.java | 4 ++-- .../reqreply/PersistedRemoteFunctionValues.java | 10 +++++----- .../flink/core/reqreply/RequestReplyClient.java | 4 ++-- .../flink/core/reqreply/RequestReplyFunction.java | 14 +++++++------- .../PersistedRemoteFunctionValuesTest.java | 8 ++++---- .../core/reqreply/RequestReplyFunctionTest.java | 22 +++++++++++----------- .../io/kafka/GenericKafkaEgressSerializer.java | 2 +- .../polyglot/GenericKinesisEgressSerializer.java | 2 +- .../src/main/protobuf/io/kafka-egress.proto | 5 +++-- .../src/main/protobuf/io/kinesis-egress.proto | 5 +++-- .../src/main/protobuf/sdk/request-reply.proto | 15 ++++++++------- 12 files changed, 48 insertions(+), 45 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java index 18cc5bb..eeb9468 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java @@ -22,8 +22,8 @@ import com.google.protobuf.Message; import com.google.protobuf.Parser; import java.io.IOException; import java.io.InputStream; -import org.apache.flink.statefun.flink.core.polyglot.generated.Address; import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.reqreply.generated.Address; public final class PolyglotUtil { private PolyglotUtil() {} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java index f27528d..fd7e5fb 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java @@ -33,10 +33,10 @@ import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction; -import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction; import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient; import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction; +import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction; import org.apache.flink.util.IOUtils; final class HttpRequestReplyClient implements RequestReplyClient { diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java index e0b43bc..42cffbe 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java @@ -23,14 +23,14 @@ import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.ExpirationSpec; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec; -import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction; -import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.InvocationBatchRequest; import org.apache.flink.statefun.flink.core.types.remote.RemoteValueTypeMismatchException; import org.apache.flink.statefun.sdk.TypeName; import org.apache.flink.statefun.sdk.annotations.Persisted; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.ExpirationSpec; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueMutation; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueSpec; +import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction; +import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.InvocationBatchRequest; import org.apache.flink.statefun.sdk.state.Expiration; import org.apache.flink.statefun.sdk.state.PersistedStateRegistry; import org.apache.flink.statefun.sdk.state.RemotePersistedValue; diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java index fef64c5..19fddee 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java @@ -20,8 +20,8 @@ package org.apache.flink.statefun.flink.core.reqreply; import java.util.concurrent.CompletableFuture; import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction; -import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction; +import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction; public interface RequestReplyClient { diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java index b2054f2..51db78c 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java @@ -28,19 +28,19 @@ import java.util.concurrent.CompletableFuture; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.statefun.flink.core.backpressure.InternalContext; import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.IncompleteInvocationContext; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.InvocationResponse; -import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction; -import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.Invocation; -import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.InvocationBatchRequest; import org.apache.flink.statefun.sdk.Address; import org.apache.flink.statefun.sdk.AsyncOperationResult; import org.apache.flink.statefun.sdk.Context; import org.apache.flink.statefun.sdk.StatefulFunction; import org.apache.flink.statefun.sdk.annotations.Persisted; import org.apache.flink.statefun.sdk.io.EgressIdentifier; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.EgressMessage; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.IncompleteInvocationContext; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.InvocationResponse; +import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction; +import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.Invocation; +import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.InvocationBatchRequest; import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer; import org.apache.flink.statefun.sdk.state.PersistedValue; import org.apache.flink.types.Either; diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java index a563886..b5f2927 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java @@ -26,11 +26,11 @@ import static org.hamcrest.core.Is.is; import com.google.protobuf.ByteString; import java.util.Arrays; import java.util.Collections; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec; -import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.InvocationBatchRequest; -import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.PersistedValue; import org.apache.flink.statefun.sdk.TypeName; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueMutation; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueSpec; +import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.InvocationBatchRequest; +import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.PersistedValue; import org.junit.Test; public class PersistedRemoteFunctionValuesTest { diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java index d545281..9b5d9c9 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java @@ -42,22 +42,22 @@ import org.apache.flink.statefun.flink.core.TestUtils; import org.apache.flink.statefun.flink.core.backpressure.InternalContext; import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics; import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.DelayedInvocation; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.ExpirationSpec; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.IncompleteInvocationContext; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.InvocationResponse; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation.MutationType; -import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec; -import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction; -import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.Invocation; import org.apache.flink.statefun.sdk.Address; import org.apache.flink.statefun.sdk.AsyncOperationResult; import org.apache.flink.statefun.sdk.AsyncOperationResult.Status; import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.io.EgressIdentifier; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.DelayedInvocation; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.EgressMessage; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.ExpirationSpec; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.IncompleteInvocationContext; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.InvocationResponse; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueMutation; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueMutation.MutationType; +import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueSpec; +import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction; +import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.Invocation; import org.junit.Test; public class RequestReplyFunctionTest { diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java index e20bdf1..fb8a484 100644 --- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java @@ -20,7 +20,7 @@ package org.apache.flink.statefun.flink.io.kafka; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; import java.nio.charset.StandardCharsets; -import org.apache.flink.statefun.flink.io.generated.KafkaProducerRecord; +import org.apache.flink.statefun.sdk.egress.generated.KafkaProducerRecord; import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer; import org.apache.kafka.clients.producer.ProducerRecord; diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java index db3db58..4b1c522 100644 --- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java +++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java @@ -20,7 +20,7 @@ package org.apache.flink.statefun.flink.io.kinesis.polyglot; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.flink.statefun.flink.io.generated.KinesisEgressRecord; +import org.apache.flink.statefun.sdk.egress.generated.KinesisEgressRecord; import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord; import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer; diff --git a/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto b/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto index 9284e1c..dc280b6 100644 --- a/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto +++ b/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto @@ -18,8 +18,9 @@ syntax = "proto3"; -package org.apache.flink.statefun.flink.io; -option java_package = "org.apache.flink.statefun.flink.io.generated"; +package io.statefun.sdk.egress; + +option java_package = "org.apache.flink.statefun.sdk.egress.generated"; option java_multiple_files = true; message KafkaProducerRecord { diff --git a/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto b/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto index 68c92c0..a365443 100644 --- a/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto +++ b/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto @@ -18,8 +18,9 @@ syntax = "proto3"; -package org.apache.flink.statefun.flink.io; -option java_package = "org.apache.flink.statefun.flink.io.generated"; +package io.statefun.sdk.egress; + +option java_package = "org.apache.flink.statefun.sdk.egress.generated"; option java_multiple_files = true; message KinesisEgressRecord { diff --git a/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto b/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto index b70e3a4..2ebd8f9 100644 --- a/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto +++ b/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto @@ -18,8 +18,9 @@ syntax = "proto3"; -package org.apache.flink.statefun.flink.core.polyglot; -option java_package = "org.apache.flink.statefun.flink.core.polyglot.generated"; +package io.statefun.sdk.reqreply; + +option java_package = "org.apache.flink.statefun.sdk.reqreply.generated"; option java_multiple_files = true; import "google/protobuf/any.proto"; @@ -128,7 +129,7 @@ message FromFunction { google.protobuf.Any argument = 3; } - // InvocationResponse represents a result of an org.apache.flink.statefun.flink.core.polyglot.ToFunction.InvocationBatchRequest + // InvocationResponse represents a result of an io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest // it contains a list of state mutation to preform as a result of computing this batch, and a list of outgoing messages. message InvocationResponse { repeated PersistedValueMutation state_mutations = 1; @@ -155,17 +156,17 @@ message FromFunction { string type_typename = 3; } - // IncompleteInvocationContext represents a result of an org.apache.flink.statefun.flink.core.polyglot.ToFunction.InvocationBatchRequest, + // IncompleteInvocationContext represents a result of an io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest, // which should be used as the response if the InvocationBatchRequest provided incomplete information about the // invocation, e.g. insufficient state values were provided. message IncompleteInvocationContext { repeated PersistedValueSpec missing_values = 1; } - // Response sent from the function, as a result of an org.apache.flink.statefun.flink.core.polyglot.ToFunction.InvocationBatchRequest. + // Response sent from the function, as a result of an io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest. // It can be one of the following types: - // - org.apache.flink.statefun.flink.core.polyglot.FromFunction.InvocationResponse - // - org.apache.flink.statefun.flink.core.polyglot.FromFunction.IncompleteInvocationContext + // - io.statefun.sdk.reqreply.FromFunction.InvocationResponse + // - io.statefun.sdk.reqreply.FromFunction.IncompleteInvocationContext oneof response { InvocationResponse invocation_result = 100; IncompleteInvocationContext incomplete_invocation_context = 101;
