This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ce28ddaacc5 [improve][io] Add notifyError method on PushSource.
(#20791)
ce28ddaacc5 is described below
commit ce28ddaacc58dac4f69a8f48947f53056131b74e
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jul 13 11:04:48 2023 +0800
[improve][io] Add notifyError method on PushSource. (#20791)
---
...atchPushSource.java => AbstractPushSource.java} | 20 ++----
.../org/apache/pulsar/io/core/BatchPushSource.java | 79 +---------------------
.../java/org/apache/pulsar/io/core/PushSource.java | 45 +-----------
.../org/apache/pulsar/io/core/PushSourceTest.java | 43 ++++++++++++
4 files changed, 54 insertions(+), 133 deletions(-)
diff --git
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/AbstractPushSource.java
similarity index 80%
copy from
pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
copy to
pulsar-io/core/src/main/java/org/apache/pulsar/io/core/AbstractPushSource.java
index edf7b2756dd..185d1cebfbc 100644
---
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
+++
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/AbstractPushSource.java
@@ -19,19 +19,12 @@
package org.apache.pulsar.io.core;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.pulsar.common.classification.InterfaceAudience;
-import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.functions.api.Record;
/**
- * Pulsar's Batch Push Source interface. Batch Push Sources have the same
lifecycle
- * as the regular BatchSource, aka discover, prepare. The reason its called
Push is
- * because BatchPushSource can emit a record using the consume method that they
- * invoke whenever they have data to be published to Pulsar.
+ * Pulsar's Push Source Abstract.
*/
[email protected]
[email protected]
-public abstract class BatchPushSource<T> implements BatchSource<T> {
+public abstract class AbstractPushSource<T> {
private static class NullRecord implements Record {
@Override
@@ -41,7 +34,7 @@ public abstract class BatchPushSource<T> implements
BatchSource<T> {
}
private static class ErrorNotifierRecord implements Record {
- private Exception e;
+ private final Exception e;
public ErrorNotifierRecord(Exception e) {
this.e = e;
}
@@ -59,12 +52,11 @@ public abstract class BatchPushSource<T> implements
BatchSource<T> {
private static final int DEFAULT_QUEUE_LENGTH = 1000;
private final NullRecord nullRecord = new NullRecord();
- public BatchPushSource() {
+ public AbstractPushSource() {
this.queue = new LinkedBlockingQueue<>(this.getQueueLength());
}
- @Override
- public Record<T> readNext() throws Exception {
+ protected Record<T> readNext() throws Exception {
Record<T> record = queue.take();
if (record instanceof ErrorNotifierRecord) {
throw ((ErrorNotifierRecord) record).getException();
@@ -109,4 +101,4 @@ public abstract class BatchPushSource<T> implements
BatchSource<T> {
public void notifyError(Exception ex) {
consume(new ErrorNotifierRecord(ex));
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
index edf7b2756dd..6a145b66ff0 100644
---
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
+++
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.io.core;
-import java.util.concurrent.LinkedBlockingQueue;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.functions.api.Record;
@@ -31,82 +30,10 @@ import org.apache.pulsar.functions.api.Record;
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public abstract class BatchPushSource<T> implements BatchSource<T> {
-
- private static class NullRecord implements Record {
- @Override
- public Object getValue() {
- return null;
- }
- }
-
- private static class ErrorNotifierRecord implements Record {
- private Exception e;
- public ErrorNotifierRecord(Exception e) {
- this.e = e;
- }
- @Override
- public Object getValue() {
- return null;
- }
-
- public Exception getException() {
- return e;
- }
- }
-
- private LinkedBlockingQueue<Record<T>> queue;
- private static final int DEFAULT_QUEUE_LENGTH = 1000;
- private final NullRecord nullRecord = new NullRecord();
-
- public BatchPushSource() {
- this.queue = new LinkedBlockingQueue<>(this.getQueueLength());
- }
+public abstract class BatchPushSource<T> extends AbstractPushSource<T>
implements BatchSource<T> {
@Override
public Record<T> readNext() throws Exception {
- Record<T> record = queue.take();
- if (record instanceof ErrorNotifierRecord) {
- throw ((ErrorNotifierRecord) record).getException();
- }
- if (record instanceof NullRecord) {
- return null;
- } else {
- return record;
- }
- }
-
- /**
- * Send this message to be written to Pulsar.
- * Pass null if you you are done with this task
- * @param record next message from source which should be sent to a Pulsar
topic
- */
- public void consume(Record<T> record) {
- try {
- if (record != null) {
- queue.put(record);
- } else {
- queue.put(nullRecord);
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Get length of the queue that records are push onto.
- * Users can override this method to customize the queue length
- * @return queue length
- */
- public int getQueueLength() {
- return DEFAULT_QUEUE_LENGTH;
- }
-
- /**
- * Allows the source to notify errors asynchronously.
- * @param ex
- */
- public void notifyError(Exception ex) {
- consume(new ErrorNotifierRecord(ex));
+ return super.readNext();
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
index dbd4dc4e1e9..6acccdda121 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.io.core;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.functions.api.Record;
@@ -38,49 +36,10 @@ import org.apache.pulsar.functions.api.Record;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public abstract class PushSource<T> implements Source<T> {
-
- private LinkedBlockingQueue<Record<T>> queue;
- private static final int DEFAULT_QUEUE_LENGTH = 1000;
-
- public PushSource() {
- this.queue = new LinkedBlockingQueue<>(this.getQueueLength());
- }
+public abstract class PushSource<T> extends AbstractPushSource<T> implements
Source<T> {
@Override
public Record<T> read() throws Exception {
- return queue.take();
- }
-
- /**
- * Open connector with configuration.
- *
- * @param config initialization config
- * @param sourceContext environment where the source connector is running
- * @throws Exception IO type exceptions when opening a connector
- */
- public abstract void open(Map<String, Object> config, SourceContext
sourceContext) throws Exception;
-
- /**
- * Attach a consumer function to this Source. This is invoked by the
implementation
- * to pass messages whenever there is data to be pushed to Pulsar.
- *
- * @param record next message from source which should be sent to a Pulsar
topic
- */
- public void consume(Record<T> record) {
- try {
- queue.put(record);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Get length of the queue that records are push onto.
- * Users can override this method to customize the queue length
- * @return queue length
- */
- public int getQueueLength() {
- return DEFAULT_QUEUE_LENGTH;
+ return super.readNext();
}
}
diff --git
a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/PushSourceTest.java
b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/PushSourceTest.java
new file mode 100644
index 00000000000..3c23e6401f0
--- /dev/null
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/PushSourceTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core;
+
+import java.util.Map;
+import org.testng.annotations.Test;
+
+public class PushSourceTest {
+
+ PushSource testBatchSource = new PushSource() {
+ @Override
+ public void open(Map config, SourceContext context) throws Exception {
+
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+ };
+
+ @Test(expectedExceptions = RuntimeException.class,
expectedExceptionsMessageRegExp = "test exception")
+ public void testNotifyErrors() throws Exception {
+ testBatchSource.notifyError(new RuntimeException("test exception"));
+ testBatchSource.readNext();
+ }
+}