----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35591/#review94911 -----------------------------------------------------------
Can you add documentation to the existing Kafka Connector documentation in docs/src/site/sphinx/Connectors.rst? connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConstants.java (lines 42 - 46) <https://reviews.apache.org/r/35591/#comment149597> Are these configurable by the end user? Should the y be? connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (lines 78 - 79) <https://reviews.apache.org/r/35591/#comment149593> The broker list (in other kafka usages) usually takes a list like host1:port1,host2:port2 where at least one broker will be available. This assumes only 1 broker. connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 83) <https://reviews.apache.org/r/35591/#comment149602> This only allows all brokers to be on 1 port. Each broker can have a different port. Which is fairly common is psuedo-distributed test environments. connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 86) <https://reviews.apache.org/r/35591/#comment149594> This behavior would cause very unexpected behavior for a user with any topic that has more than 1 partition as it would only read the first and ignore all other data. I dont think a connector that can only read 1 hardcoded partition is usefull. The default should be reading all partitions. You should be able to use the KafkaPartition passed in by the Extractor to pick which partition you are consuming. Though I am not well versed in Sqoop2 that appears to be what the partitioners are for. connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 98) <https://reviews.apache.org/r/35591/#comment149595> Unfortunatly in the next realease of Kafka (Released October-ish) there will be a new consumer. It may be worth tracking that now instead of adding the connector on the old consumer that will no longer be used going forward. It would help the test & design effort of the new consumer as well. That may not be an option, but I figure I would suggest it. connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 100) <https://reviews.apache.org/r/35591/#comment149608> This implies we will only read an entire topic each time. Even if we don't support custom offsets. Supporting commiting offsets to Kafka would be nice and give you incrimental loads. If you look at ConsoleConsumer in Kafka it offers options like: --from-beginning, --delete-consumer-offsets (which is like saying --from-end), and picks up where you left off by default. connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 116) <https://reviews.apache.org/r/35591/#comment149596> DEFAULT_RETRY_TIME is a confusing name. Seams like its some form of MAX_ERRORS. Though I am not sure failing after X errors makes sense. I suspect either the error is unrecoverable or we can safely recover. connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 119) <https://reviews.apache.org/r/35591/#comment149598> This could indicate some issue in Kafka. In many cases its too low of a retention time configured for the topic and we may be loosing data. At a minimum we should warn log that this is occuring. connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 124) <https://reviews.apache.org/r/35591/#comment149599> Should this only occur for certain errors? ex: NotLeaderForPartitionCode, LeaderNotAvailableCode, BrokerNotAvailableCode connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 137) <https://reviews.apache.org/r/35591/#comment149605> When/how does this happen? connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 146) <https://reviews.apache.org/r/35591/#comment149615> I am not sure about sqoop2 Serde support, but does this imply we only support topics that contain strings? What if it contains other formats? connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 148) <https://reviews.apache.org/r/35591/#comment149606> KAFKA_CONNECTOR_0004's error message is really generic connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 154) <https://reviews.apache.org/r/35591/#comment149607> Would this do anything since in this case hasData = false and the while loop would kick out? connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 178) <https://reviews.apache.org/r/35591/#comment149610> Should this really return 0? What kind of errors can happen? connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 182) <https://reviews.apache.org/r/35591/#comment149611> When can this happen? should it return 0? connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 189) <https://reviews.apache.org/r/35591/#comment149603> 3 seams like an arbitrary number. At least name it in a variable to convery its purpose. connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 190) <https://reviews.apache.org/r/35591/#comment149613> It looks like m_replicaBrokers is not needed or used. I am not a fan of mutating passed arguments. It makes state hard to track. connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 200) <https://reviews.apache.org/r/35591/#comment149612> Should at least do something here, or don't catch it. connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 203) <https://reviews.apache.org/r/35591/#comment149604> Broker failure is not the only reason for leader change. It could simply be a leader election. connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 204) <https://reviews.apache.org/r/35591/#comment149616> Should this return null? The error message above says "Exiting". connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 207) <https://reviews.apache.org/r/35591/#comment149600> Should this be findPartitionMetadata? connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java (line 244) <https://reviews.apache.org/r/35591/#comment149601> perhaps createSimpleConsumer since a makes a new one? connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaPartitioner.java (lines 42 - 46) <https://reviews.apache.org/r/35591/#comment149614> I think this should be where you lookup all partitions in a topic and return them for use in the Extractor. Thanks for submitting a patch for this! I know my review has a lot of comments, but I wanted to put all of my thoughts out there. Feel free to discuss any of them. - Grant Henke On Aug. 4, 2015, 8:25 a.m., richard zhou wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/35591/ > ----------------------------------------------------------- > > (Updated Aug. 4, 2015, 8:25 a.m.) > > > Review request for Sqoop. > > > Repository: sqoop-sqoop2 > > > Description > ------- > > This is an initial patch. It may not work currently. Please check whether > this design of framework is fine or not. > > > Diffs > ----- > > common/src/main/java/org/apache/sqoop/error/code/KafkaConnectorErrors.java > 436b852b0c69258753d7f09c3411bc19d16957a3 > > connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConnector.java > e3eafe87f7376031d79fec76739fd9e648df872b > > connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaConstants.java > 9d3877db4ab53b09aec34511be6a29cc28611f63 > > connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaExtractor.java > PRE-CREATION > > connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaFromDestroyer.java > PRE-CREATION > > connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaFromInitializer.java > PRE-CREATION > > connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaPartition.java > PRE-CREATION > > connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/KafkaPartitioner.java > PRE-CREATION > > connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/FromJobConfig.java > PRE-CREATION > > connector/connector-kafka/src/main/java/org/apache/sqoop/connector/kafka/configuration/FromJobConfiguration.java > PRE-CREATION > > connector/connector-kafka/src/main/resources/kafka-connector-config.properties > d3e1e6f3f0de8bae60ac22bbf718dddd8330362a > > Diff: https://reviews.apache.org/r/35591/diff/ > > > Testing > ------- > > Passed compile, but it still needs to be tested in the real cluster. > > > Thanks, > > richard zhou > >
