XuMingmin commented on a change in pull request #1127: [CALCITE-2913] add a 
KafkaAdapter for Stream
URL: https://github.com/apache/calcite/pull/1127#discussion_r283087983
 
 

 ##########
 File path: 
kafka/src/main/java/org/apache/calcite/adapter/kafka/KafkaMessageEnumerator.java
 ##########
 @@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.kafka;
+
+import org.apache.calcite.linq4j.Enumerator;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+import java.time.Duration;
+import java.util.ConcurrentModificationException;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+
+/**
+ * Enumerator to read data from {@link Consumer},
+ * and converted into SQL rows with {@link KafkaRowConverter}.
+ * @param <K>: type for Kafka message key,
+ *           refer to {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG};
+ * @param <V>: type for Kafka message value,
+ *           refer to {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG};
+ */
+public class KafkaMessageEnumerator<K, V> implements Enumerator {
+  final Consumer consumer;
+  final KafkaRowConverter<K, V> rowConverter;
+
+  //runtime
+  private LinkedList<ConsumerRecord<K, V>> bufferedRecords = new 
LinkedList<>();
+  private ConsumerRecord<K, V> curRecord;
+
+  public KafkaMessageEnumerator(final Consumer consumer,
+      final KafkaRowConverter<K, V> rowConverter) {
+    this.consumer = consumer;
+    this.rowConverter = rowConverter;
+  }
+
+  /**
+   * Gets the current element in the collection.
+   *
+   * <p>After an enumerator is created or after the {@link #reset} method is
+   * called, the {@link #moveNext} method must be called to advance the
+   * enumerator to the first element of the collection before reading the
+   * value of the {@code current} property; otherwise, {@code current} is
+   * undefined.
+   *
+   * <p>This method also throws {@link NoSuchElementException} if
+   * the last call to {@code moveNext} returned {@code false}, which indicates
+   * the end of the collection.
+   *
+   * <p>This method does not move the position of the enumerator, and
+   * consecutive calls to {@code current} return the same object until either
+   * {@code moveNext} or {@code reset} is called.
+   *
+   * <p>An enumerator remains valid as long as the collection remains
+   * unchanged. If changes are made to the collection, such as adding,
+   * modifying, or deleting elements, the enumerator is irrecoverably
+   * invalidated. The next call to {@code moveNext} or {@code reset} may,
+   * at the discretion of the implementation, throw a
+   * {@link ConcurrentModificationException}. If the collection is
+   * modified between {@code moveNext} and {@code current}, {@code current}
+   * returns the element that it is set to, even if the enumerator is already
+   * invalidated.
+   *
+   * @return Current element
+   *
+   * @throws ConcurrentModificationException if collection
+   *          has been modified
+   *
+   * @throws NoSuchElementException if {@code moveToNext}
+   *          has not been called, has not been called since the most
+   *          recent call to {@code reset}, or returned false
+   */
+  @Override public Object current() {
+    return rowConverter.toRow(curRecord);
+  }
+
+  /**
+   * Advances the enumerator to the next element of the collection.
+   *
+   * <p>After an enumerator is created or after the {@code reset} method is
+   * called, an enumerator is positioned before the first element of the
+   * collection, and the first call to the {@code moveNext} method moves the
+   * enumerator over the first element of the collection.
+   *
+   * <p>If {@code moveNext} passes the end of the collection, the enumerator
+   * is positioned after the last element in the collection and
+   * {@code moveNext} returns {@code false}. When the enumerator is at this
+   * position, subsequent calls to {@code moveNext} also return {@code false}
+   * until {@code #reset} is called.
+   *
+   * <p>An enumerator remains valid as long as the collection remains
+   * unchanged. If changes are made to the collection, such as adding,
+   * modifying, or deleting elements, the enumerator is irrecoverably
+   * invalidated. The next call to {@code moveNext} or {@link #reset} may,
+   * at the discretion of the implementation, throw a
+   * {@link ConcurrentModificationException}.
+   *
+   * @return {@code true} if the enumerator was successfully advanced to the
+   *         next element; {@code false} if the enumerator has passed the end 
of
+   *         the collection
+   */
+  @Override public boolean moveNext() {
+    while (bufferedRecords.isEmpty()) {
+      pullRecords();
+    }
+
+    curRecord = bufferedRecords.removeFirst();
+    return true;
+  }
+
+  private void pullRecords() {
+    ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
+    for (ConsumerRecord record : records) {
+      bufferedRecords.add(record);
+    }
+
+    consumer.commitSync();
+  }
+
+
+  /**
+   * Sets the enumerator to its initial position, which is before the first
 
 Review comment:
   `@inheritDoc` seems not valid, removed JavaDoc directly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to