This is an automated email from the ASF dual-hosted git repository. magang pushed a commit to branch realtime-streaming in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/realtime-streaming by this push: new c406029 KYLIN-3742 Fix DataRequest for NPE and add some javadoc c406029 is described below commit c406029bf6fea7dc78417aa4ff20a13c092e0079 Author: hit-lacus <hit_la...@126.com> AuthorDate: Wed Dec 26 20:37:22 2018 +0800 KYLIN-3742 Fix DataRequest for NPE and add some javadoc --- .../apache/kylin/metadata/model/ParameterDesc.java | 14 +++++++-- .../controller/StreamingCoordinatorController.java | 34 ++++++++++++++-------- .../stream/rpc/HttpStreamDataSearchClient.java | 11 ++++--- .../kylin/stream/coordinator/Coordinator.java | 9 ++++-- .../kylin/stream/core/model/DataRequest.java | 7 +++-- .../core/storage/StreamingSegmentManager.java | 2 ++ .../apache/kylin/stream/core/util/RestService.java | 10 +++++++ .../kylin/stream/server/StreamingServer.java | 28 ++++++++++++------ .../server/rest/controller/DataController.java | 3 +- .../{PolicyInfo.java => RetentionPolicyInfo.java} | 11 ++++++- 10 files changed, 92 insertions(+), 37 deletions(-) diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java index f757503..45af397 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java @@ -149,8 +149,11 @@ public class ParameterDesc implements Serializable { if (p.isColumnType()) { if (q.isColumnType() == false) return false; - if (q.getColRef().equals(p.getColRef()) == false) + if (q.getColRef() != null && q.getColRef().equals(p.getColRef()) == false) return false; + if (!(q.getType().equals(p.getType()) && q.getValue().equals(q.getValue()))) { + return false; + } } else { if (q.isColumnType() == true) return false; @@ -185,8 +188,13 @@ public class ParameterDesc implements Serializable { @Override public String toString() { - String thisStr = isColumnType() ? colRef.toString() : value; - return nextParameter == null ? thisStr : thisStr + "," + nextParameter.toString(); + String tmp = null; + if (isColumnType() && colRef != null) { + tmp = colRef.toString(); + } else { + tmp = value; + } + return nextParameter == null ? tmp : tmp + "," + nextParameter.toString(); } /** diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java index bc3886f..afb6a43 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java @@ -100,21 +100,24 @@ public class StreamingCoordinatorController extends BasicController { } } - @RequestMapping(value = "/cubes/{cubeName}/assign", method = { RequestMethod.PUT }) + @RequestMapping(value = "/cubes/{cubeName}/assign", method = { RequestMethod.PUT }, produces = { + "application/json" }) @ResponseBody public CoordinatorResponse assignStreamingCube(@PathVariable String cubeName) { streamingCoordinartorService.assignCube(cubeName); return new CoordinatorResponse(); } - @RequestMapping(value = "/cubes/{cubeName}/unAssign", method = { RequestMethod.PUT }) + @RequestMapping(value = "/cubes/{cubeName}/unAssign", method = { RequestMethod.PUT }, produces = { + "application/json" }) @ResponseBody public CoordinatorResponse unAssignStreamingCube(@PathVariable String cubeName) { streamingCoordinartorService.unAssignCube(cubeName); return new CoordinatorResponse(); } - @RequestMapping(value = "/cubes/{cubeName}/reAssign", method = { RequestMethod.POST }) + @RequestMapping(value = "/cubes/{cubeName}/reAssign", method = { RequestMethod.POST }, produces = { + "application/json" }) @ResponseBody public CoordinatorResponse reAssignStreamingCube(@PathVariable String cubeName, @RequestBody CubeAssignment newAssignments) { @@ -122,49 +125,55 @@ public class StreamingCoordinatorController extends BasicController { return new CoordinatorResponse(); } - @RequestMapping(value = "/replicaSet", method = { RequestMethod.POST }) + @RequestMapping(value = "/replicaSet", method = { RequestMethod.POST }, produces = { "application/json" }) @ResponseBody public CoordinatorResponse createReplicaSet(@RequestBody ReplicaSet rs) { streamingCoordinartorService.createReplicaSet(rs); return new CoordinatorResponse(); } - @RequestMapping(value = "/replicaSet/{replicaSetID}", method = { RequestMethod.DELETE }) + @RequestMapping(value = "/replicaSet/{replicaSetID}", method = { RequestMethod.DELETE }, produces = { + "application/json" }) @ResponseBody public CoordinatorResponse deleteReplicaSet(@PathVariable Integer replicaSetID) { streamingCoordinartorService.removeReplicaSet(replicaSetID); return new CoordinatorResponse(); } - @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.PUT }) + @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.PUT }, produces = { + "application/json" }) @ResponseBody public CoordinatorResponse addNodeToReplicaSet(@PathVariable Integer replicaSetID, @PathVariable String nodeID) { streamingCoordinartorService.addNodeToReplicaSet(replicaSetID, nodeID); return new CoordinatorResponse(); } - @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.DELETE }) + @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.DELETE }, produces = { + "application/json" }) @ResponseBody - public CoordinatorResponse removeNodeFromReplicaSet(@PathVariable Integer replicaSetID, @PathVariable String nodeID) { + public CoordinatorResponse removeNodeFromReplicaSet(@PathVariable Integer replicaSetID, + @PathVariable String nodeID) { streamingCoordinartorService.removeNodeFromReplicaSet(replicaSetID, nodeID); return new CoordinatorResponse(); } - @RequestMapping(value = "/cubes/{cubeName}/pauseConsume", method = { RequestMethod.PUT }) + @RequestMapping(value = "/cubes/{cubeName}/pauseConsume", method = { RequestMethod.PUT }, produces = { + "application/json" }) @ResponseBody public CoordinatorResponse pauseCubeConsume(@PathVariable String cubeName) { streamingCoordinartorService.pauseConsumers(cubeName); return new CoordinatorResponse(); } - @RequestMapping(value = "/cubes/{cubeName}/resumeConsume", method = { RequestMethod.PUT }) + @RequestMapping(value = "/cubes/{cubeName}/resumeConsume", method = { RequestMethod.PUT }, produces = { + "application/json" }) @ResponseBody public CoordinatorResponse resumeCubeConsume(@PathVariable String cubeName) { streamingCoordinartorService.resumeConsumers(cubeName); return new CoordinatorResponse(); } - @RequestMapping(value = "/remoteStoreComplete", method = { RequestMethod.POST }) + @RequestMapping(value = "/remoteStoreComplete", method = { RequestMethod.POST }, produces = { "application/json" }) @ResponseBody public CoordinatorResponse segmentRemoteStoreComplete(@RequestBody RemoteStoreCompleteRequest request) { Pair<Long, Long> segmentRange = new Pair<>(request.getSegmentStart(), request.getSegmentEnd()); @@ -176,7 +185,8 @@ public class StreamingCoordinatorController extends BasicController { return new CoordinatorResponse(); } - @RequestMapping(value = "/replicaSetLeaderChange", method = { RequestMethod.POST }) + @RequestMapping(value = "/replicaSetLeaderChange", method = { RequestMethod.POST }, produces = { + "application/json" }) @ResponseBody public CoordinatorResponse replicaSetLeaderChange(@RequestBody ReplicaSetLeaderChangeRequest request) { logger.info("receive replicaSet leader change:" + request); diff --git a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java index ec13485..36ae3b3 100644 --- a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java +++ b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java @@ -103,8 +103,8 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { final ResponseResultSchema schema = new ResponseResultSchema(cubeDesc, dimensions, metrics); final StreamingTupleConverter tupleConverter = new StreamingTupleConverter(schema, tupleInfo); final RecordsSerializer recordsSerializer = new RecordsSerializer(schema); - final DataRequest dataRequest = createDataRequest(query.getQueryId(), cube.getName(), minSegmentTime, - tupleInfo, tupleFilter, dimensions, groups, metrics, storagePushDownLimit, allowStorageAggregation); + final DataRequest dataRequest = createDataRequest(query.getQueryId(), cube.getName(), minSegmentTime, tupleInfo, + tupleFilter, dimensions, groups, metrics, storagePushDownLimit, allowStorageAggregation); logger.info("Query-{}:send request to stream receivers", query.getQueryId()); for (final ReplicaSet rs : replicaSetsOfCube) { @@ -173,9 +173,8 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { return receivers.get((receiverNo + 1) % receiversSize); } - public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube, - StreamingTupleConverter tupleConverter, RecordsSerializer recordsSerializer, Node receiver, - TupleInfo tupleInfo) throws Exception { + public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube, StreamingTupleConverter tupleConverter, + RecordsSerializer recordsSerializer, Node receiver, TupleInfo tupleInfo) throws Exception { String queryId = dataRequest.getQueryId(); logger.info("send query to receiver " + receiver + " with query id:" + queryId); String url = "http://" + receiver.getHost() + ":" + receiver.getPort() + "/kylin/api/data/query"; @@ -235,7 +234,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { } request.setGroups(groupSet); - request.setMetrics(metrics); + request.setMetrics(Lists.newArrayList(metrics)); return request; } diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java index bb16dd5..f218d9d 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java @@ -100,9 +100,14 @@ import com.google.common.collect.Sets; import javax.annotation.Nullable; /** - * - * Each Kylin Streaming cluster has a coordinator to handle generic assignment, membership and streaming cube state management. + * <pre> + * Each Kylin streaming cluster has at least one coordinator processes/server, coordinator + * server works as the master node of streaming cluster and handle generic assignment, + * membership and streaming cube state management. * + * When cluster have several coordinator processes, only the leader try to answer coordinator client's + * request, others process will become standby/candidate, so single point of failure will be eliminated. + * </pre> */ public class Coordinator implements CoordinatorClient { private static final Logger logger = LoggerFactory.getLogger(Coordinator.class); diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java index dd8b58a..07c9028 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java @@ -18,6 +18,7 @@ package org.apache.kylin.stream.core.model; +import java.util.List; import java.util.Set; import org.apache.kylin.metadata.model.FunctionDesc; @@ -30,7 +31,7 @@ public class DataRequest { private String havingFilter; private Set<String> dimensions; // what contains in Pair is <tableName, columnName> private Set<String> groups; - private Set<FunctionDesc> metrics; + private List<FunctionDesc> metrics; private int storagePushDownLimit = Integer.MAX_VALUE; private boolean allowStorageAggregation; @@ -78,11 +79,11 @@ public class DataRequest { this.groups = groups; } - public Set<FunctionDesc> getMetrics() { + public List<FunctionDesc> getMetrics() { return metrics; } - public void setMetrics(Set<FunctionDesc> metrics) { + public void setMetrics(List<FunctionDesc> metrics) { this.metrics = metrics; } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java index 0e05eaf..537f5a4 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java @@ -149,6 +149,8 @@ public class StreamingSegmentManager implements Closeable { } activeSegments.put(segmentStart, segment); + // when current active segments exceed tolerance, some unpredictable accident may happend, + // but is should be configurable or computed on the fly if (activeSegments.size() > 12) { logger.warn("Two many active segments, segments size = " + activeSegments.keySet()); } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java index 1c75460..50c4ba6 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java @@ -109,6 +109,16 @@ public class RestService { HttpResponse response = httpClient.execute(request); String msg = EntityUtils.toString(response.getEntity()); int code = response.getStatusLine().getStatusCode(); + if (logger.isTraceEnabled()) { + String displayMessage; + if (msg.length() > 500) { + displayMessage = msg.substring(0, 500); + } else { + displayMessage = msg; + } + logger.trace("Send request: {}. And receive response[{}] which lenght is {}, and content is {}.", code, + request.getRequestLine().toString(), msg.length(), displayMessage); + } if (code != 200) throw new IOException("Invalid http response " + code + " when send request: " + request.getURI().toString() + "\n" + msg); diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java index 481b746..c171561 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java @@ -85,7 +85,7 @@ import org.apache.kylin.stream.core.storage.columnar.ColumnarStoreCache; import org.apache.kylin.stream.core.util.HDFSUtil; import org.apache.kylin.stream.core.util.NamedThreadFactory; import org.apache.kylin.stream.core.util.NodeUtil; -import org.apache.kylin.stream.server.retention.PolicyInfo; +import org.apache.kylin.stream.server.retention.RetentionPolicyInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,6 +109,9 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis private StreamMetadataStore streamMetadataStore; private Node currentNode; private int replicaSetID = -1; + /** + * indicate whether current receiver is the leader of whole replica set + */ private volatile boolean isLeader = false; private ScheduledExecutorService segmentStateCheckerExecutor; @@ -155,21 +158,21 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis CubeInstance cubeInstance = segmentManager.getCubeInstance(); String cubeName = cubeInstance.getName(); try { - PolicyInfo policyInfo = new PolicyInfo(); + RetentionPolicyInfo retentionPolicyInfo = new RetentionPolicyInfo(); String policyName = cubeInstance.getConfig().getStreamingSegmentRetentionPolicy(); Map<String, String> policyProps = cubeInstance.getConfig() .getStreamingSegmentRetentionPolicyProperties(policyName); - policyInfo.setName(policyName); - policyInfo.setProperties(policyProps); + retentionPolicyInfo.setName(policyName); + retentionPolicyInfo.setProperties(policyProps); //The returned segments that require remote persisted are already sorted in ascending order by the segment start time Collection<StreamingCubeSegment> segments = segmentManager.getRequireRemotePersistSegments(); if (!segments.isEmpty()) { logger.info("found cube {} segments:{} are immutable, retention policy is: {}", cubeName, - segments, policyInfo.getName()); + segments, retentionPolicyInfo.getName()); } else { continue; } - handleImmutableCubeSegments(cubeName, segmentManager, segments, policyInfo); + handleImmutableCubeSegments(cubeName, segmentManager, segments, retentionPolicyInfo); } catch (Exception e) { logger.error("error when handle cube:" + cubeName, e); } @@ -178,14 +181,21 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis }, 60, 60, TimeUnit.SECONDS); } + /** + * <pre> + * When segment status was changed to immutable, the leader of replica will + * try to upload local segment cache to remote, while the follower will remove + * local segment cache. + * </pre> + */ private void handleImmutableCubeSegments(String cubeName, StreamingSegmentManager segmentManager, - Collection<StreamingCubeSegment> segments, PolicyInfo policyInfo) throws Exception { - if (PolicyInfo.FULL_BUILD_POLICY.equalsIgnoreCase(policyInfo.getName())) { + Collection<StreamingCubeSegment> segments, RetentionPolicyInfo retentionPolicyInfo) throws Exception { + if (RetentionPolicyInfo.FULL_BUILD_POLICY.equalsIgnoreCase(retentionPolicyInfo.getName())) { if (isLeader) { sendSegmentsToFullBuild(cubeName, segmentManager, segments); } } else { - purgeSegments(cubeName, segments, policyInfo.getProperties()); + purgeSegments(cubeName, segments, retentionPolicyInfo.getProperties()); } } diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java index 2fdc218..45c6307 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java @@ -18,6 +18,7 @@ package org.apache.kylin.stream.server.rest.controller; +import java.util.List; import java.util.Set; import org.apache.commons.codec.binary.Base64; @@ -140,7 +141,7 @@ public class DataController extends BasicController { } } - private Set<FunctionDesc> convertMetrics(CubeDesc cubeDesc, Set<FunctionDesc> metrics) { + private Set<FunctionDesc> convertMetrics(CubeDesc cubeDesc, List<FunctionDesc> metrics) { Set<FunctionDesc> result = Sets.newHashSet(); for (FunctionDesc metric : metrics) { result.add(findAggrFuncFromCubeDesc(cubeDesc, metric)); diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java similarity index 87% rename from stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java rename to stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java index 08b3a6c..0320032 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java @@ -22,8 +22,17 @@ import java.util.Map; import com.google.common.collect.Maps; -public class PolicyInfo { +/** + * Retention policy for local segment cache + */ +public class RetentionPolicyInfo { + /** + * outdated data will be dropped + */ public static final String PURGE_POLICY = "purge"; + /** + * when data become immutable, it will be presisted remotely + */ public static final String FULL_BUILD_POLICY = "fullBuild"; private String name;