http://git-wip-us.apache.org/repos/asf/kylin/blob/1c4deab9/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java b/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java new file mode 100644 index 0000000..3182c16 --- /dev/null +++ b/server/src/test/java/org/apache/kylin/rest/service/TestBaseWithZookeeper.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import org.I0Itec.zkclient.IDefaultNameSpace; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkServer; +import org.apache.hadoop.fs.FileUtil; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.springframework.security.authentication.TestingAuthenticationToken; +import org.springframework.security.core.Authentication; +import org.springframework.security.core.context.SecurityContextHolder; + +import java.io.File; + +/** + */ +public class TestBaseWithZookeeper extends LocalFileMetadataTestCase { + protected static final String zkAddress = "localhost:2199"; + static ZkServer server; + static boolean zkStarted = false; + + @BeforeClass + public static void setupResource() throws Exception { + staticCreateTestMetadata(); + + if (zkStarted == false) { + final File tmpDir = File.createTempFile("KylinTest", null); + FileUtil.fullyDelete(tmpDir); + tmpDir.mkdirs(); + tmpDir.deleteOnExit(); + server = new ZkServer(tmpDir.getAbsolutePath() + "/dataDir", tmpDir.getAbsolutePath() + "/logDir", new IDefaultNameSpace() { + @Override + public void createDefaultNameSpace(ZkClient zkClient) { + } + }, 2199, 1000, 2000); + + server.start(); + zkStarted = true; + System.setProperty("kylin.zookeeper.address", zkAddress); + } + + } + + @AfterClass + public static void tearDownResource() { + if (server == null) { + server.shutdown(); + zkStarted = false; + System.setProperty("kylin.zookeeper.address", ""); + } + + staticCleanupTestMetadata(); + } + +}
http://git-wip-us.apache.org/repos/asf/kylin/blob/1c4deab9/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index 0907623..b075387 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -40,7 +40,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import com.google.common.collect.Maps; import kafka.message.MessageAndOffset; import org.apache.commons.lang3.StringUtils; @@ -102,7 +105,9 @@ public final class TimedJsonStreamParser extends StreamingParser { @Override public StreamingMessage parse(MessageAndOffset messageAndOffset) { try { - Map<String, String> root = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType); + Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType); + ConcurrentMap<String, String> root = new ConcurrentSkipListMap<String, String>(String.CASE_INSENSITIVE_ORDER); + root.putAll(message); String tsStr = root.get(tsColName); //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + // //" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData()));
