[
https://issues.apache.org/jira/browse/TAJO-1487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424411#comment-15424411
]
ASF GitHub Bot commented on TAJO-1487:
--------------------------------------
Github user jinossy commented on a diff in the pull request:
https://github.com/apache/tajo/pull/1043#discussion_r75111322
--- Diff:
tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaScanner.java
---
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.storage.EmptyTuple;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.kafka.KafkaFragment.KafkaFragmentKey;
+import org.apache.tajo.storage.text.DelimitedTextFile;
+import org.apache.tajo.storage.text.TextLineDeserializer;
+import org.apache.tajo.storage.text.TextLineParsingError;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class KafkaScanner implements Scanner {
+ private static final Log LOG = LogFactory.getLog(KafkaScanner.class);
+ protected boolean inited = false;
+
+ private int numRows = 0;
+ private int readRecordIndex = -1;
+ private int fragmentSize;
+
+ private long pollTimeout;
+
+ private KafkaFragmentKey startKey;
+ private KafkaFragmentKey endKey;
+ private long currentOffset;
+
+ private SimpleConsumerManager simpleConsumerManager;
+
+ private List<ConsumerRecord<byte[], byte[]>> records = null;
+
+ private Schema schema;
+ private TableMeta meta;
+ private TableStats tableStats;
+ private KafkaFragment fragment;
+ private Column[] targets;
+ private TextLineDeserializer deserializer;
+ private AtomicBoolean finished = new AtomicBoolean(false);
--- End diff --
There is no race condition in scanner.
> Kafka Scanner for kafka strage.
> -------------------------------
>
> Key: TAJO-1487
> URL: https://issues.apache.org/jira/browse/TAJO-1487
> Project: Tajo
> Issue Type: Sub-task
> Components: Storage
> Reporter: YeonSu Han
> Assignee: Byunghwa Yun
> Labels: kafka_storage
>
> Have to implement Scanner interface for Kafka storage.
> The scan by split into many fragments. Scanner and Fragment are 1 to 1
> mapping.
> For this, need to feature like this,
> - Create Fragment class
> - Implement 'Tuple next()' method in Scanner.
> - etc..
> The issue is related to TAJO-1480, TAJO-1502. So first, must be commit
> TAJO-1480 and TAJO-1502.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)