pengmide commented on code in PR #19945: URL: https://github.com/apache/flink/pull/19945#discussion_r907977299
########## flink-python/pyflink/datastream/connectors/cassandra.py: ########## @@ -0,0 +1,1648 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from enum import Enum +from typing import List, Union + +from pyflink.common import Duration, Configuration +from pyflink.java_gateway import get_gateway +from pyflink.util.java_utils import to_jarray + +# ---- Classes introduced to construct the MapperOptions ---- + + +class ConsistencyLevel(Enum): + """ + The consistency level + """ + ANY = 0 + ONE = 1 + TWO = 2 + THREE = 3 + QUORUM = 4 + ALL = 5 + LOCAL_QUORUM = 6 + EACH_QUORUM = 7 + SERIAL = 8 + LOCAL_SERIAL = 9 + LOCAL_ONE = 10 + + def _to_j_consistency_level(self): + JConsistencyLevel = get_gateway().jvm.com.datastax.driver.core.ConsistencyLevel + return getattr(JConsistencyLevel, self.name) + + +# ---- Classes introduced to construct the ClusterBuilder ---- + + +class ProtocolVersion(Enum): + """ + Versions of the native protocol supported by the driver. + """ + V1 = 0 + V2 = 1 + V3 = 2 + v4 = 3 + V5 = 4 + V6 = 5 + + def _to_j_protocol_version(self): + JProtocolVersion = get_gateway().jvm.com.datastax.driver.core.ProtocolVersion + return getattr(JProtocolVersion, self.name) + + +class InetAddress(object): + """ + This class represents an Internet Protocol (IP) address. + """ + def __init__(self, j_inet_address): + self._j_inet_address = j_inet_address + + +class InetSocketAddress(object): + """ + This class implements an IP Socket Address (IP address + port number) It can also be a pair + (hostname + port number), in which case an attempt will be made to resolve the hostname. + If resolution fails then the address is said to be unresolved but can still be used on some + circumstances like connecting through a proxy. + + It provides an immutable object used by sockets for binding, connecting, or as returned values. + + The wildcard is a special local IP address. It usually means "any" and can only be used for bind + operations. + """ + def __init__(self, j_inet_socket_address): + self._j_inet_socket_address = j_inet_socket_address + + +class LoadBalancingPolicy(object): + """ + The policy that decides which Cassandra hosts to contact for each new query. + + The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy + is expected to exclude down hosts from query plans. + """ + + def __init__(self, j_load_balancing_policy): + self._j_load_balancing_policy = j_load_balancing_policy + + @staticmethod + def default_load_balancing_policy() -> 'LoadBalancingPolicy': + """ + The default load balancing policy. + + The default load balancing policy is DCAwareRoundRobinPolicy with token awareness. + """ + JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies + return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy()) + + @staticmethod + def round_robin_policy() -> 'LoadBalancingPolicy': + """ + A Round-robin load balancing policy. + + This policy queries nodes in a round-robin fashion. For a given query, if an host fail, the + next one (following the round-robin order) is tried, until all hosts have been tried. + + This policy is not datacenter aware and will include every known Cassandra hosts in its + round-robin algorithm. If you use multiple datacenter this will be inefficient, and you will + want to use the DCAwareRoundRobinPolicy load balancing policy instead. + """ + JRoundRobinPolicy = get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy + return LoadBalancingPolicy(JRoundRobinPolicy()) + + +class ReconnectionPolicy(object): + """ + Policy that decides how often the reconnection to a dead node is attempted. + + Note that if the driver receives a push notification from the Cassandra cluster that a node is + UP, any existing ReconnectionSchedule on that node will be cancelled and a new one will be + created (in effect, the driver reset the scheduler). + + The default ExponentialReconnectionPolicy policy is usually adequate. + """ + + def __init__(self, j_reconnection_policy): + self._j_reconnection_policy = j_reconnection_policy + + @staticmethod + def default_reconnection_policy() -> 'ReconnectionPolicy': + """ + The default reconnection policy. + + The default reconnection policy is an ExponentialReconnectionPolicy where the base delay is + 1 second and the max delay is 10 minutes. + """ + JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies + return ReconnectionPolicy(JPolicies.defaultReconnectionPolicy()) + + +class RetryPolicy(object): + """ + A policy that defines a default behavior to adopt when a request fails. + + There are three possible decisions: + - RETHROW: no retry should be attempted and an exception should be thrown. + - RETRY: the operation will be retried. The consistency level of the retry should be specified. + - IGNORE: no retry should be attempted and the exception should be ignored. In that case, the + operation that triggered the Cassandra exception will return an empty result set. + """ + + def __init__(self, j_retry_policy): + self._j_retry_policy = j_retry_policy + + @staticmethod + def default_retry_policy() -> 'RetryPolicy': + """ + The default retry policy. + """ + JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies + return RetryPolicy(JPolicies.defaultRetryPolicy()) + + @staticmethod + def fallthrough_retry_policy() -> 'RetryPolicy': + """ + A retry policy that never retries (nor ignores). + """ + JFallthroughRetryPolicy = get_gateway().jvm.com.datastax.driver.core.policies.\ + FallthroughRetryPolicy + return RetryPolicy(JFallthroughRetryPolicy.INSTANCE) + + +class AddressTranslator(object): + """ + Translates IP addresses received from Cassandra nodes into locally queryable addresses. + + The driver auto-detect new Cassandra nodes added to the cluster through server side pushed + notifications and through checking the system tables. For each node, the address the driver + will receive will correspond to the address set as rpc_address in the node yaml file. + In most case, this is the correct address to use by the driver and that is what is used by + default. However, sometimes the addresses received through this mechanism will either not be + reachable directly by the driver or should not be the preferred address to use to reach the + node (for instance, the rpc_address set on Cassandra nodes might be a private IP, but some + clients may have to use a public IP, or pass by a router to reach that node). This interface + allows to deal with such cases, by allowing to translate an address as sent by a Cassandra node + to another address to be used by the driver for connection. + + Please note that the contact points addresses provided while creating the Cluster instance are + not "translated", only IP address retrieved from or sent by Cassandra nodes to the driver are. + """ + + def __init__(self, j_address_translator): + self._j_address_translator = j_address_translator + + @staticmethod + def default_address_translator() -> 'AddressTranslator': + """ + The default timestamp generator. + + This is an instance of ServerSideTimestampGenerator. + """ + JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies + return AddressTranslator(JPolicies.defaultAddressTranslator()) + + @staticmethod + def ec2_multi_region_address_translator() -> 'AddressTranslator': + """ + AddressTranslator implementation for a multi-region EC2 deployment where clients are also + deployed in EC2. + """ + JEC2MultiRegionAddressTranslator = get_gateway().jvm.\ + com.datastax.driver.core.policies.EC2MultiRegionAddressTranslator + return AddressTranslator(JEC2MultiRegionAddressTranslator()) + + +class TimestampGenerator(object): + """ + Generates client-side, microsecond-precision query timestamps. + + Given that Cassandra uses those timestamps to resolve conflicts, implementations should generate + incrementing timestamps for successive implementations. + """ Review Comment: This is the part about ClusterBuilder, I will create a new PR and fix it for easy review~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
