Repository: kylin Updated Branches: refs/heads/master 20ac92a32 -> 506cd7831
http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index ab8b161..20c57a9 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -1,20 +1,37 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ + package org.apache.kylin.source.kafka; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java index d84d3db..4145ef6 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java @@ -1,20 +1,37 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ + package org.apache.kylin.source.kafka; import java.util.Collections; http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java deleted file mode 100644 index bb64bf9..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.kafka; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.time.FastDateFormat; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.TblColRef; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class UpdateTimeRangeStep extends AbstractExecutable { - - private static final Logger logger = LoggerFactory.getLogger(UpdateTimeRangeStep.class); - - public UpdateTimeRangeStep() { - super(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); - final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); - final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); - final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); - final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); - final Path outputFile = new Path(outputPath, partitionCol.getName()); - - String minValue = null, maxValue = null, currentValue = null; - try (FileSystem fs = HadoopUtil.getFileSystem(outputPath); FSDataInputStream inputStream = fs.open(outputFile); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) { - minValue = currentValue = bufferedReader.readLine(); - while (currentValue != null) { - maxValue = currentValue; - currentValue = bufferedReader.readLine(); - } - } catch (IOException e) { - logger.error("fail to read file " + outputFile, e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - - final DataType partitionColType = partitionCol.getType(); - FastDateFormat dateFormat; - if (partitionColType.isDate()) { - dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN); - } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { - dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS); - } else if (partitionColType.isStringFamily()) { - String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat(); - if (StringUtils.isEmpty(partitionDateFormat)) { - partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN; - } - dateFormat = DateFormat.getDateFormat(partitionDateFormat); - } else { - return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type"); - } - - try { - long startTime = dateFormat.parse(minValue).getTime(); - long endTime = dateFormat.parse(maxValue).getTime(); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - segment.setDateRangeStart(startTime); - segment.setDateRangeEnd(endTime); - cubeBuilder.setToUpdateSegs(segment); - cubeManager.updateCube(cubeBuilder); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); - } catch (Exception e) { - logger.error("fail to update cube segment offset", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java index 95349c2..04a66f6 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java @@ -22,7 +22,6 @@ import java.util.List; import javax.annotation.Nullable; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.persistence.Serializer; @@ -68,7 +67,7 @@ public class KafkaClusterConfig extends RootPersistentEntity { @Nullable @Override public Broker apply(BrokerConfig input) { - return new Broker(input.getId(), input.getHost(), input.getPort(), SecurityProtocol.PLAINTEXT); + return new Broker(input.getId(), input.getHost(), input.getPort()); } }); } http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java deleted file mode 100644 index decfb60..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.kafka.hadoop; - -import org.apache.kylin.source.kafka.util.KafkaClient; -import org.apache.kylin.source.kafka.util.KafkaOffsetMapping; -import org.apache.commons.cli.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.engine.mr.common.AbstractHadoopJob; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.source.kafka.KafkaConfigManager; -import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; - -/** - * Run a Hadoop Job to process the stream data in kafka; - * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader - */ -public class KafkaFlatTableJob extends AbstractHadoopJob { - protected static final Logger logger = LoggerFactory.getLogger(KafkaFlatTableJob.class); - - public static final String CONFIG_KAFKA_PARITION_MIN = "kafka.partition.min"; - public static final String CONFIG_KAFKA_PARITION_MAX = "kafka.partition.max"; - public static final String CONFIG_KAFKA_PARITION_START = "kafka.partition.start."; - public static final String CONFIG_KAFKA_PARITION_END = "kafka.partition.end."; - - public static final String CONFIG_KAFKA_BROKERS = "kafka.brokers"; - public static final String CONFIG_KAFKA_TOPIC = "kafka.topic"; - public static final String CONFIG_KAFKA_TIMEOUT = "kafka.connect.timeout"; - public static final String CONFIG_KAFKA_BUFFER_SIZE = "kafka.connect.buffer.size"; - public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group"; - public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format"; - public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name"; - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_SEGMENT_NAME); - parseOptions(options, args); - - job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); - String cubeName = getOptionValue(OPTION_CUBE_NAME); - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - - String segmentName = getOptionValue(OPTION_SEGMENT_NAME); - - // ---------------------------------------------------------------------------- - // add metadata to distributed cache - CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - CubeInstance cube = cubeMgr.getCube(cubeName); - - job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); - logger.info("Starting: " + job.getJobName()); - - setJobClasspath(job, cube.getConfig()); - - KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()); - KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cube.getFactTable()); - String brokers = KafkaClient.getKafkaBrokers(kafkaConfig); - String topic = kafkaConfig.getTopic(); - - if (brokers == null || brokers.length() == 0 || topic == null) { - throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic); - } - - job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers); - job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic); - job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout())); - job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize())); - job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json"); - job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); - job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName()); - job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name - setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW)); - job.setNumReduceTasks(0); - FileOutputFormat.setOutputPath(job, output); - FileOutputFormat.setCompressOutput(job, true); - org.apache.log4j.Logger.getRootLogger().info("Output hdfs location: " + output); - org.apache.log4j.Logger.getRootLogger().info("Output hdfs compression: " + true); - job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); - - deletePath(job.getConfiguration(), output); - - attachKylinPropsAndMetadata(cube, job.getConfiguration()); - - return waitForCompletion(job); - - } catch (Exception e) { - logger.error("error in KafkaFlatTableJob", e); - printUsage(options); - throw e; - } finally { - if (job != null) - cleanupTempConfFile(job.getConfiguration()); - } - - } - - private void setupMapper(CubeSegment cubeSeg) throws IOException { - // set the segment's offset info to job conf - Map<Integer, Long> offsetStart = KafkaOffsetMapping.parseOffsetStart(cubeSeg); - Map<Integer, Long> offsetEnd = KafkaOffsetMapping.parseOffsetEnd(cubeSeg); - - Integer minPartition = Collections.min(offsetStart.keySet()); - Integer maxPartition = Collections.max(offsetStart.keySet()); - job.getConfiguration().set(CONFIG_KAFKA_PARITION_MIN, minPartition.toString()); - job.getConfiguration().set(CONFIG_KAFKA_PARITION_MAX, maxPartition.toString()); - - for(Integer partition: offsetStart.keySet()) { - job.getConfiguration().set(CONFIG_KAFKA_PARITION_START + partition, offsetStart.get(partition).toString()); - job.getConfiguration().set(CONFIG_KAFKA_PARITION_END + partition, offsetEnd.get(partition).toString()); - } - - job.setMapperClass(KafkaFlatTableMapper.class); - job.setInputFormatClass(KafkaInputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setNumReduceTasks(0); - } - - public static void main(String[] args) throws Exception { - KafkaFlatTableJob job = new KafkaFlatTableJob(); - int exitCode = ToolRunner.run(job, args); - System.exit(exitCode); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java deleted file mode 100644 index 995b2d4..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.kafka.hadoop; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.engine.mr.KylinMapper; - -public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, Text, Text> { - - private Text outKey = new Text(); - private Text outValue = new Text(); - - @Override - protected void setup(Context context) throws IOException { - Configuration conf = context.getConfiguration(); - bindCurrentConfiguration(conf); - } - - @Override - public void map(LongWritable key, BytesWritable value, Context context) throws IOException { - try { - outKey.set(Bytes.toBytes(key.get())); - outValue.set(value.getBytes(), 0, value.getLength()); - context.write(outKey, outValue); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java deleted file mode 100644 index 81f6bac..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.kafka.hadoop; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import org.apache.kylin.source.kafka.util.KafkaClient; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; - -/** - * Convert Kafka topic to Hadoop InputFormat - * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader - */ -public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { - - @Override - public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - - String brokers = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BROKERS); - String inputTopic = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TOPIC); - String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP); - Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN)); - Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX)); - - Map<Integer, Long> startOffsetMap = Maps.newHashMap(); - Map<Integer, Long> endOffsetMap = Maps.newHashMap(); - for (int i = partitionMin; i <= partitionMax; i++) { - String start = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_START + i); - String end = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_END + i); - if (start != null && end != null) { - startOffsetMap.put(i, Long.valueOf(start)); - endOffsetMap.put(i, Long.valueOf(end)); - } - } - - List<InputSplit> splits = new ArrayList<InputSplit>(); - try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, null)) { - List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic); - Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side"); - for (int i = 0; i < partitionInfos.size(); i++) { - PartitionInfo partition = partitionInfos.get(i); - int partitionId = partition.partition(); - if (startOffsetMap.containsKey(partitionId) == false) { - throw new IllegalStateException("Partition '" + partitionId + "' not exists."); - } - - if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) { - InputSplit split = new KafkaInputSplit( - brokers, inputTopic, - partitionId, - startOffsetMap.get(partitionId), endOffsetMap.get(partitionId) - ); - splits.add(split); - } - } - } - return splits; - } - - @Override - public RecordReader<LongWritable, BytesWritable> createRecordReader( - InputSplit arg0, TaskAttemptContext arg1) throws IOException, - InterruptedException { - return new KafkaInputRecordReader(); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java deleted file mode 100644 index f67fef5..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.kafka.hadoop; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Iterator; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.TopicPartition; -import org.apache.kylin.common.util.Bytes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Convert Kafka topic to Hadoop InputFormat - * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader - */ -public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWritable> { - - static Logger log = LoggerFactory.getLogger(KafkaInputRecordReader.class); - - private Configuration conf; - - private KafkaInputSplit split; - private Consumer consumer; - private String brokers; - private String topic; - - private int partition; - private long earliestOffset; - private long watermark; - private long latestOffset; - - private ConsumerRecords<String, String> messages; - private Iterator<ConsumerRecord<String, String>> iterator; - private LongWritable key; - private BytesWritable value; - - private long timeOut = 60000; - private long bufferSize = 65536; - - private long numProcessedMessages = 0L; - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - initialize(split, context.getConfiguration()); - } - - public void initialize(InputSplit split, Configuration conf) throws IOException, InterruptedException { - this.conf = conf; - this.split = (KafkaInputSplit) split; - brokers = this.split.getBrokers(); - topic = this.split.getTopic(); - partition = this.split.getPartition(); - watermark = this.split.getOffsetStart(); - - if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT) != null) { - timeOut = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT)); - } - if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE) != null) { - bufferSize = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE)); - } - - String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP); - consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, null); - - earliestOffset = this.split.getOffsetStart(); - latestOffset = this.split.getOffsetEnd(); - TopicPartition topicPartition = new TopicPartition(topic, partition); - consumer.assign(Arrays.asList(topicPartition)); - log.info("Split {} Topic: {} Broker: {} Partition: {} Start: {} End: {}", new Object[] { this.split, topic, this.split.getBrokers(), partition, earliestOffset, latestOffset }); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (key == null) { - key = new LongWritable(); - } - if (value == null) { - value = new BytesWritable(); - } - - if (messages == null) { - log.info("{} fetching offset {} ", topic + ":" + split.getBrokers() + ":" + partition, watermark); - TopicPartition topicPartition = new TopicPartition(topic, partition); - consumer.seek(topicPartition, watermark); - messages = consumer.poll(timeOut); - iterator = messages.iterator(); - if (!iterator.hasNext()) { - log.info("No more messages, stop"); - throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark)); - } - } - - if (iterator.hasNext()) { - ConsumerRecord<String, String> message = iterator.next(); - if (message.offset() >= latestOffset) { - log.info("Reach the end offset, stop reading."); - return false; - } - key.set(message.offset()); - byte[] valuebytes = Bytes.toBytes(message.value()); - value.set(valuebytes, 0, valuebytes.length); - watermark = message.offset() + 1; - numProcessedMessages++; - if (!iterator.hasNext()) { - messages = null; - iterator = null; - } - return true; - } - - log.error("Unexpected iterator end."); - throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark)); - } - - @Override - public LongWritable getCurrentKey() throws IOException, InterruptedException { - return key; - } - - @Override - public BytesWritable getCurrentValue() throws IOException, InterruptedException { - return value; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - if (watermark >= latestOffset || earliestOffset == latestOffset) { - return 1.0f; - } - return Math.min(1.0f, (watermark - earliestOffset) / (float) (latestOffset - earliestOffset)); - } - - @Override - public void close() throws IOException { - log.info("{} num. processed messages {} ", topic + ":" + split.getBrokers() + ":" + partition, numProcessedMessages); - consumer.close(); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java deleted file mode 100644 index 3261399..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.kafka.hadoop; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputSplit; - -/** - * Convert Kafka topic to Hadoop InputFormat - * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader - */ -public class KafkaInputSplit extends InputSplit implements Writable { - - private String brokers; - private String topic; - private int partition; - private long offsetStart; - private long offsetEnd; - - public KafkaInputSplit() { - } - - public KafkaInputSplit(String brokers, String topic, int partition, long offsetStart, long offsetEnd) { - this.brokers = brokers; - this.topic = topic; - this.partition = partition; - this.offsetStart = offsetStart; - this.offsetEnd = offsetEnd; - } - - public void readFields(DataInput in) throws IOException { - brokers = Text.readString(in); - topic = Text.readString(in); - partition = in.readInt(); - offsetStart = in.readLong(); - offsetEnd = in.readLong(); - } - - public void write(DataOutput out) throws IOException { - Text.writeString(out, brokers); - Text.writeString(out, topic); - out.writeInt(partition); - out.writeLong(offsetStart); - out.writeLong(offsetEnd); - } - - @Override - public long getLength() throws IOException, InterruptedException { - return Long.MAX_VALUE; - } - - @Override - public String[] getLocations() throws IOException, InterruptedException { - return new String[]{brokers}; - } - - public int getPartition() { - return partition; - } - - public String getTopic() { - return topic; - } - - public String getBrokers() { - return brokers; - } - - public long getOffsetStart() { - return offsetStart; - } - - public long getOffsetEnd() { - return offsetEnd; - } - - @Override - public String toString() { - return brokers + "-" + topic + "-" + partition + "-" + offsetStart + "-" + offsetEnd; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java deleted file mode 100644 index 640cc53..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.kafka.util; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kylin.source.kafka.config.BrokerConfig; -import org.apache.kylin.source.kafka.config.KafkaClusterConfig; -import org.apache.kylin.source.kafka.config.KafkaConfig; - -import java.util.Arrays; -import java.util.Map; -import java.util.Properties; - -/** - */ -public class KafkaClient { - - public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup, Properties properties) { - Properties props = constructDefaultKafkaConsumerProperties(brokers, consumerGroup, properties); - KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); - return consumer; - } - - public static KafkaProducer getKafkaProducer(String brokers, Properties properties) { - Properties props = constructDefaultKafkaProducerProperties(brokers, properties); - KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); - return producer; - } - - private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties){ - Properties props = new Properties(); - props.put("bootstrap.servers", brokers); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("acks", "1"); - props.put("buffer.memory", 33554432); - props.put("retries", 0); - props.put("batch.size", 16384); - props.put("linger.ms", 50); - props.put("timeout.ms", "30000"); - if (properties != null) { - for (Map.Entry entry : properties.entrySet()) { - props.put(entry.getKey(), entry.getValue()); - } - } - return props; - } - - private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) { - Properties props = new Properties(); - props.put("bootstrap.servers", brokers); - props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("group.id", consumerGroup); - props.put("session.timeout.ms", "30000"); - props.put("enable.auto.commit", "false"); - if (properties != null) { - for (Map.Entry entry : properties.entrySet()) { - props.put(entry.getKey(), entry.getValue()); - } - } - return props; - } - - public static String getKafkaBrokers(KafkaConfig kafkaConfig) { - String brokers = null; - for (KafkaClusterConfig clusterConfig : kafkaConfig.getKafkaClusterConfigs()) { - for (BrokerConfig brokerConfig : clusterConfig.getBrokerConfigs()) { - if (brokers == null) { - brokers = brokerConfig.getHost() + ":" + brokerConfig.getPort(); - } else { - brokers = brokers + "," + brokerConfig.getHost() + ":" + brokerConfig.getPort(); - } - } - } - return brokers; - } - - public static long getEarliestOffset(KafkaConsumer consumer, String topic, int partitionId) { - - TopicPartition topicPartition = new TopicPartition(topic, partitionId); - consumer.assign(Arrays.asList(topicPartition)); - consumer.seekToBeginning(Arrays.asList(topicPartition)); - - return consumer.position(topicPartition); - } - - public static long getLatestOffset(KafkaConsumer consumer, String topic, int partitionId) { - - TopicPartition topicPartition = new TopicPartition(topic, partitionId); - consumer.assign(Arrays.asList(topicPartition)); - consumer.seekToEnd(Arrays.asList(topicPartition)); - - return consumer.position(topicPartition); - } - - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java deleted file mode 100644 index b46e57f..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.kafka.util; - -import com.google.common.collect.Maps; -import org.apache.kylin.cube.CubeSegment; - -import java.util.Map; - -/** - */ -public class KafkaOffsetMapping { - - public static final String OFFSET_START = "kafka.offset.start."; - public static final String OFFSET_END = "kafka.offset.end."; - - /** - * Get the start offsets for each partition from a segment - * - * @param segment - * @return - */ - public static Map<Integer, Long> parseOffsetStart(CubeSegment segment) { - return parseOffset(segment, OFFSET_START); - } - - /** - * Get the end offsets for each partition from a segment - * - * @param segment - * @return - */ - public static Map<Integer, Long> parseOffsetEnd(CubeSegment segment) { - return parseOffset(segment, OFFSET_END); - } - - /** - * Save the partition start offset to cube segment - * - * @param segment - * @param offsetStart - */ - public static void saveOffsetStart(CubeSegment segment, Map<Integer, Long> offsetStart) { - long sourceOffsetStart = 0; - for (Integer partition : offsetStart.keySet()) { - segment.getAdditionalInfo().put(OFFSET_START + partition, String.valueOf(offsetStart.get(partition))); - sourceOffsetStart += offsetStart.get(partition); - } - - segment.setSourceOffsetStart(sourceOffsetStart); - } - - /** - * Save the partition end offset to cube segment - * - * @param segment - * @param offsetEnd - */ - public static void saveOffsetEnd(CubeSegment segment, Map<Integer, Long> offsetEnd) { - long sourceOffsetEnd = 0; - for (Integer partition : offsetEnd.keySet()) { - segment.getAdditionalInfo().put(OFFSET_END + partition, String.valueOf(offsetEnd.get(partition))); - sourceOffsetEnd += offsetEnd.get(partition); - } - - segment.setSourceOffsetEnd(sourceOffsetEnd); - } - - private static Map<Integer, Long> parseOffset(CubeSegment segment, String propertyPrefix) { - final Map<Integer, Long> offsetStartMap = Maps.newHashMap(); - for (String key : segment.getAdditionalInfo().keySet()) { - if (key.startsWith(propertyPrefix)) { - Integer partition = Integer.valueOf(key.substring(propertyPrefix.length())); - Long offset = Long.valueOf(segment.getAdditionalInfo().get(key)); - offsetStartMap.put(partition, offset); - } - } - - - return offsetStartMap; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java index ddc2eb7..58cba7d 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java @@ -1,20 +1,37 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * + * contributor license agreements. See the NOTICE file distributed with + * + * this work for additional information regarding copyright ownership. + * + * The ASF licenses this file to You under the Apache License, Version 2.0 + * + * (the "License"); you may not use this file except in compliance with + * + * the License. You may obtain a copy of the License at + * + * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * + * + * Unless required by applicable law or agreed to in writing, software + * + * distributed under the License is distributed on an "AS IS" BASIS, + * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and + * + * limitations under the License. + * + * / + */ + package org.apache.kylin.source.kafka.util; import java.util.Collections; @@ -25,8 +42,6 @@ import java.util.concurrent.ConcurrentMap; import javax.annotation.Nullable; -import kafka.cluster.BrokerEndPoint; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kylin.source.kafka.TopicMeta; import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import org.slf4j.Logger; @@ -71,14 +86,13 @@ public final class KafkaRequester { if (consumerCache.containsKey(key)) { return consumerCache.get(key); } else { - BrokerEndPoint brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT); - consumerCache.putIfAbsent(key, new SimpleConsumer(brokerEndPoint.host(), brokerEndPoint.port(), timeout, bufferSize, clientId)); + consumerCache.putIfAbsent(key, new SimpleConsumer(broker.host(), broker.port(), timeout, bufferSize, clientId)); return consumerCache.get(key); } } private static String createKey(Broker broker, int timeout, int bufferSize, String clientId) { - return broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).connectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId; + return broker.getConnectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId; } public static TopicMeta getKafkaTopicMeta(KafkaClusterConfig kafkaClusterConfig) { http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java index ee5bb20..24eaa05 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import java.util.Iterator; import java.util.Map; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.source.kafka.StreamingParser; @@ -56,7 +55,7 @@ public final class KafkaUtils { if (partitionMetadata.errorCode() != 0) { logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode()); } - return new Broker(partitionMetadata.leader(), SecurityProtocol.PLAINTEXT); + return partitionMetadata.leader(); } else { return null; } http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index f285153..c7de287 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -39,7 +39,7 @@ import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.LoggableCachedThreadPool; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.cube.ISegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTScanRequest; http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index da087c9..c318cba 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -31,7 +31,7 @@ import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.cube.ISegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.FuzzyKeyEncoder; import org.apache.kylin.cube.kv.FuzzyMaskEncoder; http://git-wip-us.apache.org/repos/asf/kylin/blob/506cd783/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index 5692000..f1e5dab 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.ShardingHash; -import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.cube.ISegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.dimension.DimensionEncoding;