This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ce28ddaacc5 [improve][io] Add notifyError method on PushSource. 
(#20791)
ce28ddaacc5 is described below

commit ce28ddaacc58dac4f69a8f48947f53056131b74e
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jul 13 11:04:48 2023 +0800

    [improve][io] Add notifyError method on PushSource. (#20791)
---
 ...atchPushSource.java => AbstractPushSource.java} | 20 ++----
 .../org/apache/pulsar/io/core/BatchPushSource.java | 79 +---------------------
 .../java/org/apache/pulsar/io/core/PushSource.java | 45 +-----------
 .../org/apache/pulsar/io/core/PushSourceTest.java  | 43 ++++++++++++
 4 files changed, 54 insertions(+), 133 deletions(-)

diff --git 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/AbstractPushSource.java
similarity index 80%
copy from 
pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
copy to 
pulsar-io/core/src/main/java/org/apache/pulsar/io/core/AbstractPushSource.java
index edf7b2756dd..185d1cebfbc 100644
--- 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
+++ 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/AbstractPushSource.java
@@ -19,19 +19,12 @@
 package org.apache.pulsar.io.core;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.pulsar.common.classification.InterfaceAudience;
-import org.apache.pulsar.common.classification.InterfaceStability;
 import org.apache.pulsar.functions.api.Record;
 
 /**
- * Pulsar's Batch Push Source interface. Batch Push Sources have the same 
lifecycle
- * as the regular BatchSource, aka discover, prepare. The reason its called 
Push is
- * because BatchPushSource can emit a record using the consume method that they
- * invoke whenever they have data to be published to Pulsar.
+ * Pulsar's Push Source Abstract.
  */
[email protected]
[email protected]
-public abstract class BatchPushSource<T> implements BatchSource<T> {
+public abstract class AbstractPushSource<T> {
 
     private static class NullRecord implements Record {
         @Override
@@ -41,7 +34,7 @@ public abstract class BatchPushSource<T> implements 
BatchSource<T> {
     }
 
     private static class ErrorNotifierRecord implements Record {
-        private Exception e;
+        private final Exception e;
         public ErrorNotifierRecord(Exception e) {
             this.e = e;
         }
@@ -59,12 +52,11 @@ public abstract class BatchPushSource<T> implements 
BatchSource<T> {
     private static final int DEFAULT_QUEUE_LENGTH = 1000;
     private final NullRecord nullRecord = new NullRecord();
 
-    public BatchPushSource() {
+    public AbstractPushSource() {
         this.queue = new LinkedBlockingQueue<>(this.getQueueLength());
     }
 
-    @Override
-    public Record<T> readNext() throws Exception {
+    protected Record<T> readNext() throws Exception {
         Record<T> record = queue.take();
         if (record instanceof ErrorNotifierRecord) {
             throw ((ErrorNotifierRecord) record).getException();
@@ -109,4 +101,4 @@ public abstract class BatchPushSource<T> implements 
BatchSource<T> {
     public void notifyError(Exception ex) {
         consume(new ErrorNotifierRecord(ex));
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
index edf7b2756dd..6a145b66ff0 100644
--- 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
+++ 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.io.core;
 
-import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 import org.apache.pulsar.functions.api.Record;
@@ -31,82 +30,10 @@ import org.apache.pulsar.functions.api.Record;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public abstract class BatchPushSource<T> implements BatchSource<T> {
-
-    private static class NullRecord implements Record {
-        @Override
-        public Object getValue() {
-            return null;
-        }
-    }
-
-    private static class ErrorNotifierRecord implements Record {
-        private Exception e;
-        public ErrorNotifierRecord(Exception e) {
-            this.e = e;
-        }
-        @Override
-        public Object getValue() {
-            return null;
-        }
-
-        public Exception getException() {
-            return e;
-        }
-    }
-
-    private LinkedBlockingQueue<Record<T>> queue;
-    private static final int DEFAULT_QUEUE_LENGTH = 1000;
-    private final NullRecord nullRecord = new NullRecord();
-
-    public BatchPushSource() {
-        this.queue = new LinkedBlockingQueue<>(this.getQueueLength());
-    }
+public abstract class BatchPushSource<T> extends AbstractPushSource<T> 
implements BatchSource<T> {
 
     @Override
     public Record<T> readNext() throws Exception {
-        Record<T> record = queue.take();
-        if (record instanceof ErrorNotifierRecord) {
-            throw ((ErrorNotifierRecord) record).getException();
-        }
-        if (record instanceof NullRecord) {
-            return null;
-        } else {
-            return record;
-        }
-    }
-
-    /**
-     * Send this message to be written to Pulsar.
-     * Pass null if you you are done with this task
-     * @param record next message from source which should be sent to a Pulsar 
topic
-     */
-    public void consume(Record<T> record) {
-        try {
-            if (record != null) {
-                queue.put(record);
-            } else {
-                queue.put(nullRecord);
-            }
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Get length of the queue that records are push onto.
-     * Users can override this method to customize the queue length
-     * @return queue length
-     */
-    public int getQueueLength() {
-        return DEFAULT_QUEUE_LENGTH;
-    }
-
-    /**
-     * Allows the source to notify errors asynchronously.
-     * @param ex
-     */
-    public void notifyError(Exception ex) {
-        consume(new ErrorNotifierRecord(ex));
+        return super.readNext();
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
index dbd4dc4e1e9..6acccdda121 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.io.core;
 
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 import org.apache.pulsar.functions.api.Record;
@@ -38,49 +36,10 @@ import org.apache.pulsar.functions.api.Record;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public abstract class PushSource<T> implements Source<T> {
-
-    private LinkedBlockingQueue<Record<T>> queue;
-    private static final int DEFAULT_QUEUE_LENGTH = 1000;
-
-    public PushSource() {
-        this.queue = new LinkedBlockingQueue<>(this.getQueueLength());
-    }
+public abstract class PushSource<T> extends AbstractPushSource<T> implements 
Source<T> {
 
     @Override
     public Record<T> read() throws Exception {
-        return queue.take();
-    }
-
-    /**
-     * Open connector with configuration.
-     *
-     * @param config initialization config
-     * @param sourceContext environment where the source connector is running
-     * @throws Exception IO type exceptions when opening a connector
-     */
-    public abstract void open(Map<String, Object> config, SourceContext 
sourceContext) throws Exception;
-
-    /**
-     * Attach a consumer function to this Source. This is invoked by the 
implementation
-     * to pass messages whenever there is data to be pushed to Pulsar.
-     *
-     * @param record next message from source which should be sent to a Pulsar 
topic
-     */
-    public void consume(Record<T> record) {
-        try {
-            queue.put(record);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Get length of the queue that records are push onto.
-     * Users can override this method to customize the queue length
-     * @return queue length
-     */
-    public int getQueueLength() {
-        return DEFAULT_QUEUE_LENGTH;
+        return super.readNext();
     }
 }
diff --git 
a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/PushSourceTest.java 
b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/PushSourceTest.java
new file mode 100644
index 00000000000..3c23e6401f0
--- /dev/null
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/PushSourceTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core;
+
+import java.util.Map;
+import org.testng.annotations.Test;
+
+public class PushSourceTest {
+
+  PushSource testBatchSource = new PushSource() {
+    @Override
+    public void open(Map config, SourceContext context) throws Exception {
+
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+  };
+
+  @Test(expectedExceptions = RuntimeException.class, 
expectedExceptionsMessageRegExp = "test exception")
+  public void testNotifyErrors() throws Exception {
+    testBatchSource.notifyError(new RuntimeException("test exception"));
+    testBatchSource.readNext();
+  }
+}

Reply via email to