pengmide commented on code in PR #19945:
URL: https://github.com/apache/flink/pull/19945#discussion_r907976427


##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -0,0 +1,1648 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from enum import Enum
+from typing import List, Union
+
+from pyflink.common import Duration, Configuration
+from pyflink.java_gateway import get_gateway
+from pyflink.util.java_utils import to_jarray
+
+# ---- Classes introduced to construct the MapperOptions ----
+
+
+class ConsistencyLevel(Enum):
+    """
+    The consistency level
+    """
+    ANY = 0
+    ONE = 1
+    TWO = 2
+    THREE = 3
+    QUORUM = 4
+    ALL = 5
+    LOCAL_QUORUM = 6
+    EACH_QUORUM = 7
+    SERIAL = 8
+    LOCAL_SERIAL = 9
+    LOCAL_ONE = 10
+
+    def _to_j_consistency_level(self):
+        JConsistencyLevel = 
get_gateway().jvm.com.datastax.driver.core.ConsistencyLevel
+        return getattr(JConsistencyLevel, self.name)
+
+
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class ProtocolVersion(Enum):
+    """
+    Versions of the native protocol supported by the driver.
+    """
+    V1 = 0
+    V2 = 1
+    V3 = 2
+    v4 = 3
+    V5 = 4
+    V6 = 5
+
+    def _to_j_protocol_version(self):
+        JProtocolVersion = 
get_gateway().jvm.com.datastax.driver.core.ProtocolVersion
+        return getattr(JProtocolVersion, self.name)
+
+
+class InetAddress(object):
+    """
+    This class represents an Internet Protocol (IP) address.
+    """
+    def __init__(self, j_inet_address):
+        self._j_inet_address = j_inet_address
+
+
+class InetSocketAddress(object):
+    """
+    This class implements an IP Socket Address (IP address + port number) It 
can also be a pair
+    (hostname + port number), in which case an attempt will be made to resolve 
the hostname.
+    If resolution fails then the address is said to be unresolved but can 
still be used on some
+    circumstances like connecting through a proxy.
+
+    It provides an immutable object used by sockets for binding, connecting, 
or as returned values.
+
+    The wildcard is a special local IP address. It usually means "any" and can 
only be used for bind
+    operations.
+    """
+    def __init__(self, j_inet_socket_address):
+        self._j_inet_socket_address = j_inet_socket_address
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new 
query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For 
efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def default_load_balancing_policy() -> 'LoadBalancingPolicy':
+        """
+        The default load balancing policy.
+
+        The default load balancing policy is DCAwareRoundRobinPolicy with 
token awareness.
+        """
+        JPolicies = 
get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy())
+
+    @staticmethod
+    def round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        A Round-robin load balancing policy.
+
+        This policy queries nodes in a round-robin fashion. For a given query, 
if an host fail, the
+        next one (following the round-robin order) is tried, until all hosts 
have been tried.
+
+        This policy is not datacenter aware and will include every known 
Cassandra hosts in its
+        round-robin algorithm. If you use multiple datacenter this will be 
inefficient, and you will
+        want to use the DCAwareRoundRobinPolicy load balancing policy instead.
+        """
+        JRoundRobinPolicy = 
get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy
+        return LoadBalancingPolicy(JRoundRobinPolicy())
+
+
+class ReconnectionPolicy(object):
+    """
+    Policy that decides how often the reconnection to a dead node is attempted.
+
+    Note that if the driver receives a push notification from the Cassandra 
cluster that a node is
+    UP, any existing ReconnectionSchedule on that node will be cancelled and a 
new one will be
+    created (in effect, the driver reset the scheduler).
+
+    The default ExponentialReconnectionPolicy policy is usually adequate.
+    """
+
+    def __init__(self, j_reconnection_policy):
+        self._j_reconnection_policy = j_reconnection_policy
+
+    @staticmethod
+    def default_reconnection_policy() -> 'ReconnectionPolicy':
+        """
+        The default reconnection policy.
+
+        The default reconnection policy is an ExponentialReconnectionPolicy 
where the base delay is
+        1 second and the max delay is 10 minutes.
+        """
+        JPolicies = 
get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return ReconnectionPolicy(JPolicies.defaultReconnectionPolicy())
+
+
+class RetryPolicy(object):
+    """
+    A policy that defines a default behavior to adopt when a request fails.
+
+    There are three possible decisions:
+    - RETHROW: no retry should be attempted and an exception should be thrown.
+    - RETRY: the operation will be retried. The consistency level of the retry 
should be specified.
+    - IGNORE: no retry should be attempted and the exception should be 
ignored. In that case, the
+              operation that triggered the Cassandra exception will return an 
empty result set.
+    """
+
+    def __init__(self, j_retry_policy):
+        self._j_retry_policy = j_retry_policy
+
+    @staticmethod
+    def default_retry_policy() -> 'RetryPolicy':
+        """
+        The default retry policy.
+        """
+        JPolicies = 
get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return RetryPolicy(JPolicies.defaultRetryPolicy())
+
+    @staticmethod
+    def fallthrough_retry_policy() -> 'RetryPolicy':
+        """
+        A retry policy that never retries (nor ignores).
+        """
+        JFallthroughRetryPolicy = 
get_gateway().jvm.com.datastax.driver.core.policies.\
+            FallthroughRetryPolicy
+        return RetryPolicy(JFallthroughRetryPolicy.INSTANCE)
+
+
+class AddressTranslator(object):
+    """
+    Translates IP addresses received from Cassandra nodes into locally 
queryable addresses.
+
+    The driver auto-detect new Cassandra nodes added to the cluster through 
server side pushed
+    notifications and through checking the system tables. For each node, the 
address the driver
+    will receive will correspond to the address set as rpc_address in the node 
yaml file.
+    In most case, this is the correct address to use by the driver and that is 
what is used by
+    default. However, sometimes the addresses received through this mechanism 
will either not be
+    reachable directly by the driver or should not be the preferred address to 
use to reach the
+    node (for instance, the rpc_address set on Cassandra nodes might be a 
private IP, but some
+    clients may have to use a public IP, or pass by a router to reach that 
node). This interface
+    allows to deal with such cases, by allowing to translate an address as 
sent by a Cassandra node
+    to another address to be used by the driver for connection.
+
+    Please note that the contact points addresses provided while creating the 
Cluster instance are
+    not "translated", only IP address retrieved from or sent by Cassandra 
nodes to the driver are.
+    """
+
+    def __init__(self, j_address_translator):
+        self._j_address_translator = j_address_translator
+
+    @staticmethod
+    def default_address_translator() -> 'AddressTranslator':
+        """
+        The default timestamp generator.
+
+        This is an instance of ServerSideTimestampGenerator.
+        """
+        JPolicies = 
get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return AddressTranslator(JPolicies.defaultAddressTranslator())
+
+    @staticmethod
+    def ec2_multi_region_address_translator() -> 'AddressTranslator':
+        """
+        AddressTranslator implementation for a multi-region EC2 deployment 
where clients are also
+        deployed in EC2.
+        """
+        JEC2MultiRegionAddressTranslator = get_gateway().jvm.\
+            com.datastax.driver.core.policies.EC2MultiRegionAddressTranslator
+        return AddressTranslator(JEC2MultiRegionAddressTranslator())
+
+
+class TimestampGenerator(object):
+    """
+    Generates client-side, microsecond-precision query timestamps.
+
+    Given that Cassandra uses those timestamps to resolve conflicts, 
implementations should generate
+    incrementing timestamps for successive implementations.
+    """
+
+    def __init__(self, j_timestamp_generator):
+        self._j_timestamp_generator = j_timestamp_generator
+
+    @staticmethod
+    def default_timestamp_generator() -> 'TimestampGenerator':
+        """
+        The default speculative retry policy.
+
+        The default speculative retry policy is a NoSpeculativeExecutionPolicy.
+        """
+        JPolicies = 
get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return TimestampGenerator(JPolicies.defaultTimestampGenerator())
+
+    @staticmethod
+    def atomic_monotonic_timestamp_generator() -> 'TimestampGenerator':
+        """
+        A timestamp generator that guarantees monotonically increasing 
timestamps among all client
+        threads, and logs warnings when timestamps drift in the future.
+        """
+        JAtomicMonotonicTimestampGenerator = get_gateway().jvm.\
+            com.datastax.driver.core.AtomicMonotonicTimestampGenerator
+        return TimestampGenerator(JAtomicMonotonicTimestampGenerator())
+
+    @staticmethod
+    def server_side_timestamp_generator() -> 'TimestampGenerator':
+        """
+        A timestamp generator that always returns Java Long.MIN_VALUE, in 
order to let Cassandra
+        assign server-side timestamps.
+        """
+        JServerSideTimestampGenerator = get_gateway().jvm.\
+            com.datastax.driver.core.ServerSideTimestampGenerator
+        return TimestampGenerator(JServerSideTimestampGenerator.INSTANCE)
+
+
+class CodecRegistry(object):
+    """
+    A registry for TypeCodecs.
+    """
+
+    def __init__(self, j_codec_registry):
+        self._j_codec_registry = j_codec_registry
+
+    @staticmethod
+    def default_codec_registry() -> 'CodecRegistry':
+        """
+        The default CodecRegistry instance.
+
+        It will be shared among all Cluster instances that were not explicitly 
built with a
+        different instance.
+        """
+        JCodecRegistry = 
get_gateway().jvm.com.datastax.driver.core.CodecRegistry
+        return CodecRegistry(JCodecRegistry.DEFAULT_INSTANCE)
+
+
+class AuthProvider(object):
+    """
+    Provides Authenticator instances for use when connecting to Cassandra 
nodes.
+    """
+
+    def __init__(self, j_auth_provider):
+        self._j_auth_provider = j_auth_provider
+
+    @staticmethod
+    def default_auth_provider(username: str, password: str) -> 'AuthProvider':
+        """
+        This provider allows to programmatically define authentication 
information that will then
+        apply to all hosts. The PlainTextAuthenticator instances it returns 
support SASL
+        authentication using the PLAIN mechanism for version 2 (or above) of 
the CQL native
+        protocol.
+        """
+        JAuthProvider = 
get_gateway().jvm.com.datastax.driver.core.PlainTextAuthProvider
+        return AuthProvider(JAuthProvider(username, password))
+
+
+class ProtocolOptionsCompression(Enum):
+    """
+    Compression supported by the Cassandra binary protocol.
+    """
+    NONE = 0
+    SNAPPY = 1
+    LZ4 = 2
+
+    def _to_j_protocol_options_compression(self):
+        JProtocolOptionsCompression = 
get_gateway().jvm.com.datastax.driver.core. \
+            ProtocolOptions.Compression
+        return getattr(JProtocolOptionsCompression, self.name)
+
+
+class SslContext(object):
+    """
+    The ssl context.
+    """
+
+    def __init__(self, j_ssl_context):
+        self._j_ssl_context = j_ssl_context
+
+
+class SSLOptions(object):
+    """
+    Defines how the driver configures SSL connections.
+    """
+
+    def __init__(self, j_ssl_options):
+        self._j_ssl_options = j_ssl_options
+
+    @staticmethod
+    def jdk_ssl_options() -> 'SSLOptions':
+        """
+        SSLOptions implementation based on built-in JDK classes.
+        """
+        JJdkSSLOptions = 
get_gateway().jvm.com.datastax.driver.core.JdkSSLOptions
+        return SSLOptions(JJdkSSLOptions.builder().build())
+
+    @staticmethod
+    def netty_ssl_options(context: SslContext) -> 'SSLOptions':
+        """
+        SSLOptions implementation based on Netty's SSL context.
+
+        Netty has the ability to use OpenSSL if available, instead of the 
JDK's built-in engine.
+        This yields better performance.
+        """
+        return SSLOptions(context._j_ssl_context)
+
+
+class StateListener(object):
+    """
+    listeners that are interested in hosts added, up, down and removed events.
+
+    It is possible for the same event to be fired multiple times, particularly 
for up or down
+    events. Therefore, a listener should ignore the same event if it has 
already been notified of
+    a node's state.
+    """
+
+    def __init__(self, j_state_listener):
+        self._j_state_listener = j_state_listener
+
+
+class ClusterManager(object):
+    """
+    The sessions and hosts managed by this a Cluster instance.
+
+    Note: the reason we create a Manager object separate from Cluster is that 
Manager is not
+    publicly visible. For instance, we wouldn't want user to be able to call 
the onUp and onDown
+    methods.
+    """
+
+    def __init__(self, cluster_name: str, contact_points: 
List[InetSocketAddress],
+                 configuration: Configuration, listeners: List[StateListener]):
+        self._j_cluster_manager = get_gateway().jvm.com.datastax.driver.core. \
+            ClusterBuilder.Manager(cluster_name, contact_points, 
configuration, listeners)
+
+
+class HostDistance(Enum):
+    """
+    The distance to a Cassandra node as assigned by a LoadBalancingPolicy 
(through its distance
+    method).
+
+    The distance assigned to an host influences how many connections the 
driver maintains towards
+    this host. If for a given host the assigned HostDistance is LOCAL or 
REMOTE, some connections
+    will be maintained by the driver to this host. More active connections 
will be kept to LOCAL
+    host than to a REMOTE one (and thus well behaving LoadBalancingPolicy 
should assign a REMOTE
+    distance only to hosts that are the less often queried).
+
+    However, if a host is assigned the distance IGNORED, no connection to that 
host will maintain
+    active. In other words, IGNORED should be assigned to host that should not 
be used by this
+    driver (because they are in a remote data center for instance).
+    """
+
+    LOCAL = 0
+    REMOTE = 1
+    IGNORED = 2
+
+    def _to_j_host_distance(self):
+        JHostDistance = get_gateway().jvm.com.datastax.driver.core.HostDistance
+        return getattr(JHostDistance, self.name)
+
+
+class Executor(object):
+    """
+    The Executor implementations provided in this package implement 
ExecutorService, which is a more
+    extensive interface. The ThreadPoolExecutor class provides an extensible 
thread pool
+    implementation. The Executors class provides convenient factory methods 
for these Executors.
+    """
+
+    def __init__(self, j_executor):
+        self._j_executor = j_executor
+
+
+class PoolingOptions(object):
+    """
+    Options related to connection pooling.
+
+    The driver uses connections in an asynchronous manner, meaning that 
multiple requests can be
+    submitted on the same connection at the same time. Therefore only a 
relatively small number of
+    connections is needed. For each host, the driver uses a connection pool 
that may have a variable
+    size (it will automatically adjust to the current load).
+    """
+
+    def __init__(self):
+        self._j_pooling_options = 
get_gateway().jvm.com.datastax.driver.core.PoolingOptions()
+
+    def register(self, manager: ClusterManager) -> 'PoolingOptions':
+        """
+        register ClusterManager
+        """
+        self._j_pooling_options.register(manager._j_cluster_manager)
+        return self
+
+    def set_core_connections_per_host(self, distance: HostDistance, 
new_core_connections: int) \
+            -> 'PoolingOptions':
+        """
+        Sets the core number of connections per host.
+
+        For the provided distance, this corresponds to the number of 
connections initially created
+        and kept open to each host of that distance.
+        """
+        self._j_pooling_options.setCoreConnectionsPerHost(
+            distance._to_j_host_distance(), new_core_connections)
+        return self
+
+    def set_max_connections_per_host(self, distance: HostDistance, 
new_max_connections: int) \
+            -> 'PoolingOptions':
+        """
+        Sets the maximum number of connections per host.
+
+        For the provided distance, this corresponds to the maximum number of 
connections that can be
+        created per host at that distance.
+        """
+        self._j_pooling_options.setMaxConnectionsPerHost(
+            distance._to_j_host_distance(), new_max_connections)
+        return self
+
+    def set_connections_per_host(self, distance: HostDistance, core: int, max: 
int) \
+            -> 'PoolingOptions':
+        """
+        Sets the core and maximum number of connections per host in one call.
+
+        This is a convenience method that is equivalent to calling 
set_core_connections_per_host(\
+        HostDistance, int) and set_max_connections_per_host(HostDistance, int).
+        """
+        
self._j_pooling_options.setConnectionsPerHost(distance._to_j_host_distance(), 
core, max)
+        return self
+
+    def set_new_connection_threshold(self, distance: HostDistance, new_value: 
int) \
+            -> 'PoolingOptions':
+        """
+        Sets the threshold that triggers the creation of a new connection to a 
host.
+        """
+        
self._j_pooling_options.setNewConnectionThreshold(distance._to_j_host_distance(),
 new_value)
+        return self
+
+    def set_max_requests_per_connection(self, distance: HostDistance, 
new_max_requests: int) \
+            -> 'PoolingOptions':
+        """
+        Sets the maximum number of requests per connection
+        """
+        self._j_pooling_options.setMaxRequestsPerConnection(
+            distance._to_j_host_distance(), new_max_requests)
+        return self
+
+    def set_idle_timeout_seconds(self, idle_timeout_seconds: int) -> 
'PoolingOptions':
+        """
+        Sets the timeout before an idle connection is removed.
+
+        The order of magnitude should be a few minutes (the default is 120 
seconds). The timeout
+        that triggers the removal has a granularity of 10 seconds.
+        """
+        self._j_pooling_options.setIdleTimeoutSeconds(idle_timeout_seconds)
+        return self
+
+    def set_pool_timeout_millis(self, pool_timeout_millis: int) -> 
'PoolingOptions':
+        """
+        Sets the timeout when trying to acquire a connection from a host's 
pool.
+
+        If no connection is available within that time, the driver will try 
the next host from the
+        query plan.
+
+        The default is 5 seconds. If this option is set to zero, the driver 
won't wait at all.
+        """
+        self._j_pooling_options.setPoolTimeoutMillis(pool_timeout_millis)
+        return self
+
+    def set_max_queue_size(self, max_queue_size: int):
+        """
+        Sets the maximum number of requests that get enqueued if no connection 
is available.
+        """
+        self._j_pooling_options.setMaxQueueSize(max_queue_size)
+        return self
+
+    def set_heartbeat_interval_seconds(self, heartbeat_interval_seconds: int) 
-> 'PoolingOptions':
+        """
+        Sets the heart beat interval, after which a message is sent on an idle 
connection to make
+        sure it's still alive.
+
+        This is an application-level keep-alive, provided for convenience 
since adjusting the TCP
+        keep-alive might not be practical in all environments.
+
+        The default value for this option is 30 seconds.
+        """
+        
self._j_pooling_options.setHeartbeatIntervalSeconds(heartbeat_interval_seconds)
+        return self
+
+    def set_initialization_executor(self, initialization_executor: Executor) 
-> 'PoolingOptions':
+        """
+        Sets the executor to use for connection initialization.
+
+        Connections are open in a completely asynchronous manner. Since 
initializing the transport
+        requires separate CQL queries, the futures representing the completion 
of these queries are
+        transformed and chained. This executor is where these transformations 
happen.
+
+        This is an advanced option, which should be rarely needed in practice.
+        """
+        
self._j_pooling_options.setInitializationExecutor(initialization_executor._j_executor)
+        return self
+
+    def set_protocol_version(self, protocol_version: ProtocolVersion) -> 
'PoolingOptions':
+        """
+        Sets protocol version.
+        """
+        
self._j_pooling_options.setProtocolVersion(protocol_version._to_j_protocol_version())
+        return self
+
+
+class SocketOptions(object):
+    """
+    Options to configure low-level socket options for the connections kept to 
the Cassandra hosts.
+    """
+
+    def __init__(self):
+        self._j_socket_options = 
get_gateway().jvm.com.datastax.driver.core.SocketOptions()
+
+    def set_connect_timeout_millis(self, connect_timeout_millis: int) -> 
'SocketOptions':
+        """
+        Sets the connection timeout in milliseconds.
+
+        The default value is 5000.
+        """
+        self._j_socket_options.setConnectTimeoutMillis(connect_timeout_millis)
+        return self
+
+    def set_read_timeout_millis(self, read_timeout_millis: int) -> 
'SocketOptions':
+        """
+        Sets the per-host read timeout in milliseconds.
+
+        Setting a value of 0 disables read timeouts.
+
+        The default value is 12000.
+        """
+        self._j_socket_options.setReadTimeoutMillis(read_timeout_millis)
+        return self
+
+    def set_keep_alive(self, keep_alive: bool) -> 'SocketOptions':
+        """
+        Sets whether to enable TCP keepalive.
+
+        By default, this option is not set by the driver. The actual value 
will be the default from
+        the underlying Netty transport (Java NIO or native epoll).
+        """
+        self._j_socket_options.setKeepAlive(keep_alive)
+        return self
+
+    def set_reuse_address(self, reuse_address: bool) -> 'SocketOptions':
+        """
+        Returns whether reuse-address is enabled.
+        """
+        self._j_socket_options.setReuseAddress(reuse_address)
+        return self
+
+    def set_so_linger(self, so_linger: int) -> 'SocketOptions':
+        """
+        Sets the linger-on-close timeout.
+
+        By default, this option is not set by the driver. The actual value 
will be the default from
+        the underlying Netty transport (Java NIO or native epoll).
+        """
+        self._j_socket_options.setSoLinger(so_linger)
+        return self
+
+    def set_tcp_no_delay(self, tcp_no_delay: bool) -> 'SocketOptions':
+        """
+        Sets whether to disable Nagle's algorithm.
+
+        By default, this option is set to true (Nagle disabled).
+        """
+        self._j_socket_options.setTcpNoDelay(tcp_no_delay)
+        return self
+
+    def set_receive_buffer_size(self, receive_buffer_size: int) -> 
'SocketOptions':
+        """
+        Sets a hint to the size of the underlying buffers for incoming network 
I/O.
+
+        By default, this option is not set by the driver. The actual value 
will be the default from
+        the underlying Netty transport (Java NIO or native epoll).
+        """
+        self._j_socket_options.setReceiveBufferSize(receive_buffer_size)
+        return self
+
+    def set_send_buffer_size(self, send_buffer_size: int) -> 'SocketOptions':
+        """
+        Sets a hint to the size of the underlying buffers for outgoing network 
I/O.
+
+        By default, this option is not set by the driver. The actual value 
will be the default from
+        the underlying Netty transport (Java NIO or native epoll).
+        """
+        self._j_socket_options.setSendBufferSize(send_buffer_size)
+        return self
+
+
+class QueryOptions(object):
+    """
+    Options related to defaults for individual queries.
+    """
+
+    def __init__(self):
+        self._j_query_options = 
get_gateway().jvm.com.datastax.driver.core.QueryOptions()
+
+    def register(self, manager: ClusterManager) -> 'QueryOptions':
+        """
+        register ClusterManager
+        """
+        self._j_query_options.register(manager._j_cluster_manager)
+        return self
+
+    def set_consistency_level(self, consistency_level: ConsistencyLevel) -> 
'QueryOptions':
+        """
+        Sets the default consistency level to use for queries.
+
+        The consistency level set through this method will be use for queries 
that don't explicitly
+        have a consistency level.
+        """
+        
self._j_query_options.setConsistencyLevel(consistency_level._to_j_consistency_level())
+        return self
+
+    def set_serial_consistency_level(self, serial_consistency_level: 
ConsistencyLevel) \
+            -> 'QueryOptions':
+        """
+        Sets the default serial consistency level to use for queries.
+
+        The serial consistency level set through this method will be use for 
queries that don't
+        explicitly have a serial consistency level.
+        """
+        self._j_query_options.setSerialConsistencyLevel(
+            serial_consistency_level._to_j_consistency_level())
+        return self
+
+    def set_fetch_size(self, fetch_size: int) -> 'QueryOptions':
+        """
+        Sets the default fetch size to use for SELECT queries.
+
+        The fetch size set through this method will be use for queries that 
don't explicitly have a
+        fetch size
+        """
+        self._j_query_options.setFetchSize(fetch_size)
+        return self
+
+    def set_default_idempotence(self, default_idempotence: bool) -> 
'QueryOptions':
+        """
+        Sets the default idempotence for queries.
+        """
+        self._j_query_options.setDefaultIdempotence(default_idempotence)
+        return self
+
+    def set_prepare_on_all_hosts(self, prepare_on_all_hosts: bool) -> 
'QueryOptions':
+        """
+        Set whether the driver should prepare statements on all hosts in the 
cluster.
+
+        A statement is normally prepared in two steps:
+         1. prepare the query on a single host in the cluster.
+         2. if that succeeds, prepare on all other hosts.
+
+        This option controls whether step 2 is executed. It is enabled by 
default.
+
+        The reason why you might want to disable it is to optimize network 
usage if you have a large
+        number of clients preparing the same set of statements at startup. If 
your load balancing
+        policy distributes queries randomly, each client will pick a different 
host to prepare its
+        statements, and on the whole each host has a good chance of having 
been hit by at least one
+        client for each statement.
+
+        On the other hand, if that assumption turns out to be wrong and one 
host hasn't prepared a
+        given statement, it needs to be re-prepared on the fly the first time 
it gets executed;
+        this causes a performance penalty (one extra roundtrip to resend the 
query to prepare, and
+        another to retry the execution).
+        """
+        self._j_query_options.setPrepareOnAllHosts(prepare_on_all_hosts)
+        return self
+
+    def set_reprepare_on_up(self, reprepare_on_up: bool) -> 'QueryOptions':
+        """
+        Set whether the driver should re-prepare all cached prepared 
statements on a host when it
+        marks it back up.
+
+        This option is enabled by default.
+
+        The reason why you might want to disable it is to optimize 
reconnection time when you
+        believe hosts often get marked down because of temporary network 
issues, rather than the
+        host really crashing. In that case, the host still has prepared 
statements in its cache when
+        the driver reconnects, so re-preparing is redundant.
+
+        On the other hand, if that assumption turns out to be wrong and the 
host had really
+        restarted, its prepared statement cache is empty, and statements need 
to be re-prepared on
+        the fly the first time they get executed; this causes a performance 
penalty (one extra
+        roundtrip to resend the query to prepare, and another to retry the 
execution).
+        """
+        self._j_query_options.setReprepareOnUp(reprepare_on_up)
+        return self
+
+    def set_metadata_enabled(self, enabled: bool) -> 'QueryOptions':
+        """
+        Toggle client-side token and schema metadata.
+
+        This feature is enabled by default. Some applications might wish to 
disable it in order to
+        eliminate the overhead of querying the metadata and building its 
client-side representation.
+        """
+        self._j_query_options.setMetadataEnabled(enabled)
+        return self
+
+    def set_refresh_schema_interval_millis(self, 
refresh_schema_interval_millis: int) \
+            -> 'QueryOptions':
+        """
+        Sets the default window size in milliseconds used to debounce node 
list refresh requests.
+
+        When the control connection receives a new schema refresh request, it 
puts it on hold and
+        starts a timer, cancelling any previous running timer; when a timer 
expires, then the
+        pending requests are coalesced and executed as a single request.
+        """
+        
self._j_query_options.setRefreshSchemaIntervalMillis(refresh_schema_interval_millis)
+        return self
+
+    def set_max_pending_refresh_schema_requests(self, 
max_pending_refresh_schema_requests: int) \
+            -> 'QueryOptions':
+        """
+        Sets the maximum number of schema refresh requests that the control 
connection can
+        accumulate before executing them.
+
+        When the control connection receives a new schema refresh request, it 
puts it on hold and
+        starts a timer, cancelling any previous running timer; if the control 
connection receives
+        too many events, is parameter allows to trigger execution of pending 
requests, event if the
+        last timer is still running.
+        """
+        self._j_query_options. \
+            
setMaxPendingRefreshSchemaRequests(max_pending_refresh_schema_requests)
+        return self
+
+    def set_refresh_node_list_interval_millis(self, 
refresh_node_list_interval_millis: int) \
+            -> 'QueryOptions':
+        """
+        Sets the default window size in milliseconds used to debounce node 
list refresh requests.
+
+        When the control connection receives a new node list refresh request, 
it puts it on hold and
+        starts a timer, cancelling any previous running timer; when a timer 
expires, then the
+        pending requests are coalesced and executed as a single request.
+        """
+        
self._j_query_options.setRefreshNodeListIntervalMillis(refresh_node_list_interval_millis)
+        return self
+
+    def set_max_pending_refresh_node_list_requests(self,
+                                                   
max_pending_refresh_node_list_requests: int) \
+            -> 'QueryOptions':
+        """
+        Sets the maximum number of node list refresh requests that the control 
connection can
+        accumulate before executing them.
+
+        When the control connection receives a new node list refresh request, 
it puts it on hold and
+        starts a timer, cancelling any previous running timer; if the control 
connection receives
+        too many events, is parameter allows to trigger execution of pending 
requests, event if the
+        last timer is still running.
+        """
+        self._j_query_options. \
+            
setMaxPendingRefreshNodeListRequests(max_pending_refresh_node_list_requests)
+        return self
+
+    def set_refresh_node_interval_millis(self, refresh_node_interval_millis: 
int) \
+            -> 'QueryOptions':
+        """
+        Sets the default window size in milliseconds used to debounce node 
refresh requests.
+
+        When the control connection receives a new node refresh request, it 
puts it on hold and
+        starts a timer, cancelling any previous running timer; when a timer 
expires, then the
+        pending requests are coalesced and executed as a single request.
+        """
+        
self._j_query_options.setRefreshNodeIntervalMillis(refresh_node_interval_millis)
+        return self
+
+    def set_max_pending_refresh_node_requests(self, 
max_pending_refresh_node_requests: int) \
+            -> 'QueryOptions':
+        """
+        Sets the maximum number of node refresh requests that the control 
connection can accumulate
+        before executing them.
+
+        When the control connection receives a new node refresh request, it 
puts it on hold and
+        starts a timer, cancelling any previous running timer; if the control 
connection receives
+        too many events, is parameter allows to trigger execution of pending 
requests, event if the
+        last timer is still running.
+        """
+        
self._j_query_options.setMaxPendingRefreshNodeRequests(max_pending_refresh_node_requests)
+        return self
+
+
+class NettyOptions(object):
+    """
+    A set of hooks that allow clients to customize the driver's underlying 
Netty layer.
+
+    Clients that need to hook into the driver's underlying Netty layer can 
subclass this class and
+    provide the necessary customization by overriding its methods.
+    """
+
+    def __init__(self, j_netty_options):
+        self._j_netty_options = j_netty_options
+
+    @staticmethod
+    def default_netty_options() -> 'NettyOptions':
+        """
+        The default CodecRegistry instance.
+
+        It will be shared among all Cluster instances that were not explicitly 
built with a
+        different instance.
+        """
+        JNettyOptions = get_gateway().jvm.com.datastax.driver.core.NettyOptions
+        return NettyOptions(JNettyOptions.DEFAULT_INSTANCE)
+
+
+class SpeculativeExecutionPolicy(object):
+    """
+    The policy that decides if the driver will send speculative queries to the 
next hosts when the
+    current host takes too long to respond.
+
+    Note that only idempotent statements will be speculatively retried.
+    """
+
+    def __init__(self, j_speculative_execution_policy):
+        self._j_speculative_execution_policy = j_speculative_execution_policy
+
+    @staticmethod
+    def default_speculative_execution_policy() -> 'SpeculativeExecutionPolicy':
+        """
+        The default speculative retry policy.
+
+        The default speculative retry policy is a NoSpeculativeExecutionPolicy.
+        """
+        JPolicies = 
get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return 
SpeculativeExecutionPolicy(JPolicies.defaultSpeculativeExecutionPolicy())
+
+    @staticmethod
+    def constant_speculative_execution_policy(constant_delay_millis: int,
+                                              max_speculative_executions: int) 
\
+            -> 'SpeculativeExecutionPolicy':
+        """
+        A SpeculativeExecutionPolicy that schedules a given number of 
speculative executions,
+        separated by a fixed delay.
+        """
+        JConstantSpeculativeExecutionPolicy = get_gateway().jvm.\
+            
com.datastax.driver.core.policies.ConstantSpeculativeExecutionPolicy
+        return SpeculativeExecutionPolicy(
+            JConstantSpeculativeExecutionPolicy(constant_delay_millis, 
max_speculative_executions))
+
+    @staticmethod
+    def no_speculative_execution_policy() -> 'SpeculativeExecutionPolicy':
+        """
+        A SpeculativeExecutionPolicy that never schedules speculative 
executions.
+        """
+        JNoSpeculativeExecutionPolicy = get_gateway().jvm.\
+            com.datastax.driver.core.policies.NoSpeculativeExecutionPolicy
+        return 
SpeculativeExecutionPolicy(JNoSpeculativeExecutionPolicy.INSTANCE)
+
+
+class EndPoint:
+    """
+    Encapsulates the information needed by the driver to open connections to a 
node.
+    """
+
+    def __init__(self, j_end_point):
+        self._j_end_point = j_end_point
+
+    @staticmethod
+    def sni_end_point(proxy_address: InetSocketAddress, server_name: str) -> 
'EndPoint':
+        """
+        An endpoint to access nodes through a proxy that uses SNI routing.
+        """
+        JSniEndPoint = get_gateway().jvm.com.datastax.driver.core.SniEndPoint
+        return EndPoint(JSniEndPoint(proxy_address._j_inet_socket_address, 
server_name))
+
+    @staticmethod
+    def translated_address_end_point(translated_address: InetSocketAddress) -> 
'EndPoint':
+        """
+        An endpoint based on server-reported RPC addresses, that might require 
translation if they
+        are accessed through a proxy.
+        """
+        JTranslatedAddressEndPoint = get_gateway().jvm.com.datastax.driver. \
+            core.TranslatedAddressEndPoint
+        return 
EndPoint(JTranslatedAddressEndPoint(translated_address._j_inet_socket_address))
+
+    @staticmethod
+    def wrapping_end_point(address: InetSocketAddress) -> 'EndPoint':
+        """
+        The sole purpose of this class is to allow some exception types to 
preserve a constructor
+        that takes an InetSocketAddress (for backward compatibility).
+        """
+        JWrappingEndPoint = 
get_gateway().jvm.com.datastax.driver.core.exceptions.WrappingEndPoint
+        return EndPoint(JWrappingEndPoint(address._j_inet_socket_address))
+
+
+class EndPointFactory:
+    """
+    Produces EndPoint instances representing the connection information to 
every node.
+
+    This component is reserved for advanced use cases where the driver needs 
more than an IP address
+    to connect.
+
+    Note that if endpoints do not translate to addresses 1-to-1, the auth 
provider and SSL options
+    should be instances of ExtendedAuthProvider and 
ExtendedRemoteEndpointAwareSslOptions
+    respectively.
+    """
+
+    def __init__(self, j_end_point_factory):
+        self._j_end_point_factory = j_end_point_factory
+
+    @staticmethod
+    def default_end_point_factory() -> 'EndPointFactory':
+        """
+        A default EndPointFactory implementation.
+        """
+        JDefaultEndPointFactory = 
get_gateway().jvm.com.datastax.driver.core.DefaultEndPointFactory
+        return EndPointFactory(JDefaultEndPointFactory())
+
+    @staticmethod
+    def sni_end_point_factory(proxy_address: InetSocketAddress) -> 
'EndPointFactory':
+        """
+        A Sni EndPointFactory implementation.
+        """
+        JSniEndPointFactory = 
get_gateway().jvm.com.datastax.driver.core.SniEndPointFactory
+        return 
EndPointFactory(JSniEndPointFactory(proxy_address._j_inet_socket_address))
+
+
+class ThreadingOptions:
+    """
+    A set of hooks that allow clients to customize the driver's internal 
executors.
+
+    The methods in this class are invoked when the cluster initializes. To 
customize the behavior,
+    extend the class and override the appropriate methods.
+
+    This is mainly intended to allow customization and instrumentation of 
driver threads. Each
+    method must return a newly-allocated executor; don't use a shared 
executor, as this could
+    introduce unintended consequences of deadlocks (we're working to simplify 
the driver's
+    architecture and reduce the number of executors in a future release). The 
default
+    implementations use unbounded queues, which is appropriate when the driver 
is properly
+    configured; the only reason you would want to use bounded queues is to 
limit memory consumption
+    in case of a bug or bad configuration. In that case, make sure to use a 
RejectedExecutionHandler
+    that throws, such as java.util.concurrent.ThreadPoolExecutor.AbortPolicy; 
a blocking handler
+    could introduce deadlocks.
+
+    Netty uses a separate pool for I/O operations, that can be configured via 
NettyOptions.
+    """
+
+    def __init__(self):
+        JThreadingOptions = 
get_gateway().jvm.com.datastax.driver.core.ThreadingOptions
+        self._j_threading_options = JThreadingOptions()
+
+
+# ---- Classes introduced to construct the CassandraSink ----
+
+
+class MapperOptions(object):
+    """
+    This class is used to configure a Mapper after deployment.
+    """
+
+    def __init__(self, j_mapper_options):
+        self._j_mapper_options = j_mapper_options
+
+    @staticmethod
+    def simple_mapper_options() -> 'MapperOptions':
+        """
+        A simple method to construct MapperOptions.
+
+        Example:
+        ::
+
+        >>> mapper_option = MapperOptions.simple_mapper_options() \\
+        ...    .ttl(1800) \\
+        ...    .timestamp(3600) \\
+        ...    .consistency_level(ConsistencyLevel.ANY) \\
+        ...    .tracing(True) \\
+        ...    .save_null_fields(True)
+        """
+        JSimpleMapperOptions = 
get_gateway().jvm.org.apache.flink.streaming.connectors. \
+            cassandra.SimpleMapperOptions
+        return MapperOptions(JSimpleMapperOptions())
+
+    def ttl(self, ttl: int) -> 'MapperOptions':
+        """
+        Creates a new Option object to add time-to-live to a mapper operation. 
This is only
+        valid for save operations.
+        """
+        self._j_mapper_options.ttl = ttl
+        return self
+
+    def timestamp(self, timestamp: int) -> 'MapperOptions':
+        """
+        Creates a new Option object to add a timestamp to a mapper operation. 
This is only
+        valid for save and delete operations.
+        """
+        self._j_mapper_options.timestamp = timestamp
+        return self
+
+    def consistency_level(self, cl: ConsistencyLevel) -> 'MapperOptions':
+        """
+        Creates a new Option object to add a consistency level value to a 
mapper operation.
+        This is valid for save, delete and get operations.
+        """
+        self._j_mapper_options.consistencyLevel(cl._to_j_consistency_level())
+        return self
+
+    def tracing(self, enabled: bool) -> 'MapperOptions':
+        """
+        Creates a new Option object to enable query tracing for a mapper 
operation. This is
+        valid for save, delete and get operations.
+        """
+        self._j_mapper_options.tracing(enabled)
+        return self
+
+    def save_null_fields(self, enabled: bool) -> 'MapperOptions':
+        """
+        Creates a new Option object to specify whether null entity fields 
should be included in
+        insert queries. This option is valid only for save operations.
+        """
+        self._j_mapper_options.saveNullFields(enabled)
+        return self
+
+    def if_not_exists(self, enabled: bool) -> 'MapperOptions':
+        """
+        Creates a new Option object to specify whether an IF NOT EXISTS clause 
should be included in
+        insert queries. This option is valid only for save operations.
+
+        If this option is not specified, it defaults to false (IF NOT EXISTS 
statements are not
+        used).
+        """
+        self._j_mapper_options.ifNotExists(enabled)
+        return self
+
+
+class ClusterBuilder(object):
+    """
+    This class is used to configure a Cluster after deployment. The cluster 
represents the
+    connection that will be established to Cassandra.
+    """
+
+    def __init__(self, j_cluster_builder):
+        self._j_cluster_builder = j_cluster_builder
+
+    @staticmethod
+    def simple_cluster_builder() -> 'ClusterBuilder':
+        """
+        A simple method to construct ClusterBuilder.
+
+        Example:
+        ::
+
+        >>> cluster_builder = ClusterBuilder.simple_cluster_builder() \\
+        ...    .add_contact_points("127.0.0.1") \\
+        ...    .with_port(9876) \\
+        ...    .with_cluster_name("cluster_name") \\
+        ...    .with_credentials("user", "password")  \\
+        ...    .with_ssl()
+        """
+        JSimpleClusterBuilder = 
get_gateway().jvm.org.apache.flink.streaming.connectors.\
+            cassandra.SimpleClusterBuilder
+        return ClusterBuilder(JSimpleClusterBuilder())
+
+    def with_cluster_name(self, name: str) -> 'ClusterBuilder':
+        """
+        An optional name for the creation cluster.
+
+        Note: this is not related to the Cassandra cluster name (though you 
are free to provide the
+        same name).
+        """
+        self._j_cluster_builder.withClusterName(name)
+        return self
+
+    def with_port(self, port: int) -> 'ClusterBuilder':
+        """
+        The port to use to connect to the Cassandra host.
+
+        If not set through this method, the default port (9042) will be used 
instead.
+        """
+        self._j_cluster_builder.withPort(port)
+        return self
+
+    def allow_beta_protocol_version(self) -> 'ClusterBuilder':
+        """
+        Create cluster connection using latest development protocol version, 
which is currently in
+        beta. Calling this method will result into setting USE_BETA flag in 
all outgoing messages,
+        which allows server to negotiate the supported protocol version even 
if it is currently in
+        beta.
+
+        This feature is only available starting with version V5.
+
+        Use with caution, refer to the server and protocol documentation for 
the details on latest
+        protocol version.
+        """
+        self._j_cluster_builder.allowBetaProtocolVersion()
+        return self
+
+    def with_max_schema_agreement_wait_seconds(self, 
max_schema_agreement_wait_seconds: int) \
+            -> 'ClusterBuilder':
+        """
+        Sets the maximum time to wait for schema agreement before returning 
from a DDL query.
+
+        If not set through this method, the default value (10 seconds) will be 
used.
+        """
+        
self._j_cluster_builder.withMaxSchemaAgreementWaitSeconds(max_schema_agreement_wait_seconds)
+        return self
+
+    def with_protocol_version(self, version: ProtocolVersion) -> 
'ClusterBuilder':
+        """
+        The native protocol version to use.
+        """
+        
self._j_cluster_builder.withProtocolVersion(version._to_j_protocol_version())
+        return self
+
+    def add_contact_point(self, address: Union[str, EndPoint]) -> 
'ClusterBuilder':
+        """
+        Adds a contact point by str or EndPoint.
+        """
+        if isinstance(address, str):
+            self._j_cluster_builder.addContactPoint(address)
+        else:
+            self._j_cluster_builder.addContactPoint(address._j_end_point)
+        return self
+
+    def add_contact_points(self, *addresses: Union[str, InetAddress]) -> 
'ClusterBuilder':
+        """
+        Adds contact points.
+        """
+        gateway = get_gateway()
+        j_addresses = []
+        for address in addresses:
+            if isinstance(address, str):
+                j_addresses.append(address)
+            else:
+                j_addresses.append((address._j_inet_address))
+        if len(addresses) > 0:
+            if isinstance(addresses[0], str):
+                
self._j_cluster_builder.addContactPoints(to_jarray(gateway.jvm.String, 
addresses))
+            else:
+                self._j_cluster_builder.addContactPoints(
+                    gateway.jvm.java.net.InetAddress, addresses)
+        return self
+
+    def add_contact_points_with_ports(self, *addresses: InetSocketAddress) -> 
'ClusterBuilder':

Review Comment:
   This is the part about ClusterBuilder, I will create a new PR and fix it for 
easy review~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to