HuangXingBo commented on a change in pull request #18388:
URL: https://github.com/apache/flink/pull/18388#discussion_r793480748



##########
File path: flink-python/pyflink/datastream/tests/test_connectors.py
##########
@@ -16,36 +16,49 @@
 # limitations under the License.
 
################################################################################
 
-from pyflink.common import typeinfo, Duration
+from pyflink.common import typeinfo, Duration, WatermarkStrategy, ConfigOptions
 from pyflink.common.serialization import JsonRowDeserializationSchema, \
-    JsonRowSerializationSchema, Encoder
+    JsonRowSerializationSchema, Encoder, SimpleStringSchema
 from pyflink.common.typeinfo import Types
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.datastream.connectors import FlinkKafkaConsumer, 
FlinkKafkaProducer, JdbcSink, \
     JdbcConnectionOptions, JdbcExecutionOptions, StreamingFileSink, \
     OutputFileConfig, FileSource, StreamFormat, FileEnumeratorProvider, 
FileSplitAssignerProvider, \
     NumberSequenceSource, RollingPolicy, FileSink, BucketAssigner, RMQSink, 
RMQSource, \
-    RMQConnectionConfig
+    RMQConnectionConfig, PulsarSource, StartCursor, 
PulsarDeserializationSchema, StopCursor, \
+    SubscriptionType
 from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
 from pyflink.java_gateway import get_gateway
 from pyflink.testing.test_case_utils import PyFlinkTestCase, 
_load_specific_flink_module_jars, \
     invoke_java_object_method
 from pyflink.util.java_utils import load_java_class, get_field_value
 
 
-class FlinkKafkaTest(PyFlinkTestCase):
+class ConnectorTestBase(PyFlinkTestCase):
 
     def setUp(self) -> None:
         self.env = StreamExecutionEnvironment.get_execution_environment()
-        self.env.set_parallelism(2)
         # Cache current ContextClassLoader, we will replace it with a 
temporary URLClassLoader to
         # load specific connector jars with given module path to do dependency 
isolation. And We
         # will change the ClassLoader back to the cached ContextClassLoader 
after the test case
         # finished.
         self._cxt_clz_loader = 
get_gateway().jvm.Thread.currentThread().getContextClassLoader()
+        _load_specific_flink_module_jars(self._jars_relative_path)

Review comment:
       What about adding a method with @classmethod and @abstractmethod 
decorator, otherwise there is no way to ensure that the newly added 
ConnectorTest knows to add a variable _jars_relative_path




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to