pengmide commented on code in PR #19945:
URL: https://github.com/apache/flink/pull/19945#discussion_r907976919


##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -0,0 +1,1648 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from enum import Enum
+from typing import List, Union
+
+from pyflink.common import Duration, Configuration
+from pyflink.java_gateway import get_gateway
+from pyflink.util.java_utils import to_jarray
+
+# ---- Classes introduced to construct the MapperOptions ----
+
+
+class ConsistencyLevel(Enum):
+    """
+    The consistency level
+    """
+    ANY = 0
+    ONE = 1
+    TWO = 2
+    THREE = 3
+    QUORUM = 4
+    ALL = 5
+    LOCAL_QUORUM = 6
+    EACH_QUORUM = 7
+    SERIAL = 8
+    LOCAL_SERIAL = 9
+    LOCAL_ONE = 10
+
+    def _to_j_consistency_level(self):
+        JConsistencyLevel = 
get_gateway().jvm.com.datastax.driver.core.ConsistencyLevel
+        return getattr(JConsistencyLevel, self.name)
+
+
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class ProtocolVersion(Enum):
+    """
+    Versions of the native protocol supported by the driver.
+    """
+    V1 = 0
+    V2 = 1
+    V3 = 2
+    v4 = 3
+    V5 = 4
+    V6 = 5
+
+    def _to_j_protocol_version(self):
+        JProtocolVersion = 
get_gateway().jvm.com.datastax.driver.core.ProtocolVersion
+        return getattr(JProtocolVersion, self.name)
+
+
+class InetAddress(object):
+    """
+    This class represents an Internet Protocol (IP) address.
+    """
+    def __init__(self, j_inet_address):
+        self._j_inet_address = j_inet_address
+
+
+class InetSocketAddress(object):
+    """
+    This class implements an IP Socket Address (IP address + port number) It 
can also be a pair
+    (hostname + port number), in which case an attempt will be made to resolve 
the hostname.
+    If resolution fails then the address is said to be unresolved but can 
still be used on some
+    circumstances like connecting through a proxy.
+
+    It provides an immutable object used by sockets for binding, connecting, 
or as returned values.
+
+    The wildcard is a special local IP address. It usually means "any" and can 
only be used for bind
+    operations.
+    """
+    def __init__(self, j_inet_socket_address):
+        self._j_inet_socket_address = j_inet_socket_address
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new 
query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For 
efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def default_load_balancing_policy() -> 'LoadBalancingPolicy':
+        """
+        The default load balancing policy.
+
+        The default load balancing policy is DCAwareRoundRobinPolicy with 
token awareness.
+        """
+        JPolicies = 
get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy())
+
+    @staticmethod
+    def round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        A Round-robin load balancing policy.
+
+        This policy queries nodes in a round-robin fashion. For a given query, 
if an host fail, the
+        next one (following the round-robin order) is tried, until all hosts 
have been tried.
+
+        This policy is not datacenter aware and will include every known 
Cassandra hosts in its
+        round-robin algorithm. If you use multiple datacenter this will be 
inefficient, and you will
+        want to use the DCAwareRoundRobinPolicy load balancing policy instead.
+        """
+        JRoundRobinPolicy = 
get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy
+        return LoadBalancingPolicy(JRoundRobinPolicy())
+
+
+class ReconnectionPolicy(object):
+    """
+    Policy that decides how often the reconnection to a dead node is attempted.
+
+    Note that if the driver receives a push notification from the Cassandra 
cluster that a node is
+    UP, any existing ReconnectionSchedule on that node will be cancelled and a 
new one will be
+    created (in effect, the driver reset the scheduler).
+
+    The default ExponentialReconnectionPolicy policy is usually adequate.
+    """
+
+    def __init__(self, j_reconnection_policy):
+        self._j_reconnection_policy = j_reconnection_policy
+
+    @staticmethod
+    def default_reconnection_policy() -> 'ReconnectionPolicy':

Review Comment:
   This is the part about ClusterBuilder, I will create a new PR and fix it for 
easy review~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to