affo commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602735147


##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java:
##########
@@ -0,0 +1,195 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.TopicName;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.concurrent.Executors.directExecutor;
+
+/**
+ * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic 
{@link

Review Comment:
   Following up on @snuyanzin comments, let's keep the consistency, either 
`Pub/Sub` or `PubSub`



##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java:
##########
@@ -0,0 +1,195 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.TopicName;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.concurrent.Executors.directExecutor;
+
+/**
+ * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic 
{@link
+ * GcpWriterClient}. The writer blocks on completion of inflight requests on 
{@code flush()} and
+ * {@code close()}. The writer also uses {@code maxInFlightRequests} and 
blocks new writes if the
+ *  number of inflight requests exceeds the specified limit.
+ *
+ * @param <T> The type of the records .
+ */
+@Internal
+public class PubSubWriter<T> implements SinkWriter<T> {
+
+    /** The PubSub generic client to publish messages. */
+    private final GcpWriterClient<PubsubMessage> publisher;
+
+    /**
+     * The maximum number of inflight requests, The writer blocks new writes 
if the number of
+     * inflight requests exceeds the specified limit.
+     */
+    private final long maximumInflightRequests;
+
+    /**
+     * Flag to indicate whether to fail on errors, if unset the writer will 
retry non-fatal request
+     * failures.
+     */
+    private final boolean failOnError;
+
+    private long inflightRequests = 0;
+
+    private final MailboxExecutor mailboxExecutor;
+    private final Counter numBytesOutCounter;
+    private final Counter numRecordsOutCounter;
+    private final Counter numRecordsOutErrorCounter;
+    private final SerializationSchema<T> serializationSchema;
+
+    PubSubWriter(
+            String projectId,
+            String topicId,
+            SerializationSchema<T> serializationSchema,
+            WriterInitContext context,
+            GcpPublisherConfig publisherConfig,
+            long maximumInflightRequests,
+            boolean failOnError)
+            throws IOException {
+        this(
+                createPublisher(projectId, topicId, publisherConfig),
+                context,
+                serializationSchema,
+                maximumInflightRequests,
+                failOnError);
+    }
+
+    @VisibleForTesting
+    PubSubWriter(
+            GcpWriterClient<PubsubMessage> publisher,
+            WriterInitContext context,
+            SerializationSchema<T> serializationSchema,
+            long maximumInflightRequests,
+            boolean failOnError) {
+        this.publisher = Preconditions.checkNotNull(publisher);
+        this.serializationSchema = 
Preconditions.checkNotNull(serializationSchema);
+        Preconditions.checkNotNull(context, "Context cannot be null.");
+
+        this.mailboxExecutor = context.getMailboxExecutor();
+        this.numBytesOutCounter = 
context.metricGroup().getIOMetricGroup().getNumBytesOutCounter();
+        this.numRecordsOutCounter =
+                
context.metricGroup().getIOMetricGroup().getNumRecordsOutCounter();
+        this.numRecordsOutErrorCounter = 
context.metricGroup().getNumRecordsOutErrorsCounter();
+        this.maximumInflightRequests = maximumInflightRequests;
+        this.failOnError = failOnError;
+    }
+
+    @Override
+    public void write(T t, SinkWriter.Context context) throws IOException, 
InterruptedException {
+        awaitMaxInflightRequestsBelow(maximumInflightRequests);
+        PubsubMessage message =
+                PubsubMessage.newBuilder()
+                        
.setData(ByteString.copyFrom(serializationSchema.serialize(t)))
+                        .build();
+        publishMessage(message);
+    }
+
+    @Override
+    public void flush(boolean b) throws IOException, InterruptedException {
+        publisher.flush();
+        awaitMaxInflightRequestsBelow(1);

Review Comment:
   @vahmed-hamdy why do you need this additional `yield` if any in-flight 
message? Can you clarify?
   
   If not obvious (and I am missing something), could you put a comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to