pengmide commented on code in PR #19945: URL: https://github.com/apache/flink/pull/19945#discussion_r907976919
########## flink-python/pyflink/datastream/connectors/cassandra.py: ########## @@ -0,0 +1,1648 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from enum import Enum +from typing import List, Union + +from pyflink.common import Duration, Configuration +from pyflink.java_gateway import get_gateway +from pyflink.util.java_utils import to_jarray + +# ---- Classes introduced to construct the MapperOptions ---- + + +class ConsistencyLevel(Enum): + """ + The consistency level + """ + ANY = 0 + ONE = 1 + TWO = 2 + THREE = 3 + QUORUM = 4 + ALL = 5 + LOCAL_QUORUM = 6 + EACH_QUORUM = 7 + SERIAL = 8 + LOCAL_SERIAL = 9 + LOCAL_ONE = 10 + + def _to_j_consistency_level(self): + JConsistencyLevel = get_gateway().jvm.com.datastax.driver.core.ConsistencyLevel + return getattr(JConsistencyLevel, self.name) + + +# ---- Classes introduced to construct the ClusterBuilder ---- + + +class ProtocolVersion(Enum): + """ + Versions of the native protocol supported by the driver. + """ + V1 = 0 + V2 = 1 + V3 = 2 + v4 = 3 + V5 = 4 + V6 = 5 + + def _to_j_protocol_version(self): + JProtocolVersion = get_gateway().jvm.com.datastax.driver.core.ProtocolVersion + return getattr(JProtocolVersion, self.name) + + +class InetAddress(object): + """ + This class represents an Internet Protocol (IP) address. + """ + def __init__(self, j_inet_address): + self._j_inet_address = j_inet_address + + +class InetSocketAddress(object): + """ + This class implements an IP Socket Address (IP address + port number) It can also be a pair + (hostname + port number), in which case an attempt will be made to resolve the hostname. + If resolution fails then the address is said to be unresolved but can still be used on some + circumstances like connecting through a proxy. + + It provides an immutable object used by sockets for binding, connecting, or as returned values. + + The wildcard is a special local IP address. It usually means "any" and can only be used for bind + operations. + """ + def __init__(self, j_inet_socket_address): + self._j_inet_socket_address = j_inet_socket_address + + +class LoadBalancingPolicy(object): + """ + The policy that decides which Cassandra hosts to contact for each new query. + + The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy + is expected to exclude down hosts from query plans. + """ + + def __init__(self, j_load_balancing_policy): + self._j_load_balancing_policy = j_load_balancing_policy + + @staticmethod + def default_load_balancing_policy() -> 'LoadBalancingPolicy': + """ + The default load balancing policy. + + The default load balancing policy is DCAwareRoundRobinPolicy with token awareness. + """ + JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies + return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy()) + + @staticmethod + def round_robin_policy() -> 'LoadBalancingPolicy': + """ + A Round-robin load balancing policy. + + This policy queries nodes in a round-robin fashion. For a given query, if an host fail, the + next one (following the round-robin order) is tried, until all hosts have been tried. + + This policy is not datacenter aware and will include every known Cassandra hosts in its + round-robin algorithm. If you use multiple datacenter this will be inefficient, and you will + want to use the DCAwareRoundRobinPolicy load balancing policy instead. + """ + JRoundRobinPolicy = get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy + return LoadBalancingPolicy(JRoundRobinPolicy()) + + +class ReconnectionPolicy(object): + """ + Policy that decides how often the reconnection to a dead node is attempted. + + Note that if the driver receives a push notification from the Cassandra cluster that a node is + UP, any existing ReconnectionSchedule on that node will be cancelled and a new one will be + created (in effect, the driver reset the scheduler). + + The default ExponentialReconnectionPolicy policy is usually adequate. + """ + + def __init__(self, j_reconnection_policy): + self._j_reconnection_policy = j_reconnection_policy + + @staticmethod + def default_reconnection_policy() -> 'ReconnectionPolicy': Review Comment: This is the part about ClusterBuilder, I will create a new PR and fix it for easy review~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
