This is an automated email from the ASF dual-hosted git repository.
magang pushed a commit to branch realtime-streaming
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/realtime-streaming by this
push:
new c406029 KYLIN-3742 Fix DataRequest for NPE and add some javadoc
c406029 is described below
commit c406029bf6fea7dc78417aa4ff20a13c092e0079
Author: hit-lacus <[email protected]>
AuthorDate: Wed Dec 26 20:37:22 2018 +0800
KYLIN-3742 Fix DataRequest for NPE and add some javadoc
---
.../apache/kylin/metadata/model/ParameterDesc.java | 14 +++++++--
.../controller/StreamingCoordinatorController.java | 34 ++++++++++++++--------
.../stream/rpc/HttpStreamDataSearchClient.java | 11 ++++---
.../kylin/stream/coordinator/Coordinator.java | 9 ++++--
.../kylin/stream/core/model/DataRequest.java | 7 +++--
.../core/storage/StreamingSegmentManager.java | 2 ++
.../apache/kylin/stream/core/util/RestService.java | 10 +++++++
.../kylin/stream/server/StreamingServer.java | 28 ++++++++++++------
.../server/rest/controller/DataController.java | 3 +-
.../{PolicyInfo.java => RetentionPolicyInfo.java} | 11 ++++++-
10 files changed, 92 insertions(+), 37 deletions(-)
diff --git
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index f757503..45af397 100644
---
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -149,8 +149,11 @@ public class ParameterDesc implements Serializable {
if (p.isColumnType()) {
if (q.isColumnType() == false)
return false;
- if (q.getColRef().equals(p.getColRef()) == false)
+ if (q.getColRef() != null &&
q.getColRef().equals(p.getColRef()) == false)
return false;
+ if (!(q.getType().equals(p.getType()) &&
q.getValue().equals(q.getValue()))) {
+ return false;
+ }
} else {
if (q.isColumnType() == true)
return false;
@@ -185,8 +188,13 @@ public class ParameterDesc implements Serializable {
@Override
public String toString() {
- String thisStr = isColumnType() ? colRef.toString() : value;
- return nextParameter == null ? thisStr : thisStr + "," +
nextParameter.toString();
+ String tmp = null;
+ if (isColumnType() && colRef != null) {
+ tmp = colRef.toString();
+ } else {
+ tmp = value;
+ }
+ return nextParameter == null ? tmp : tmp + "," +
nextParameter.toString();
}
/**
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
index bc3886f..afb6a43 100644
---
a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
+++
b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
@@ -100,21 +100,24 @@ public class StreamingCoordinatorController extends
BasicController {
}
}
- @RequestMapping(value = "/cubes/{cubeName}/assign", method = {
RequestMethod.PUT })
+ @RequestMapping(value = "/cubes/{cubeName}/assign", method = {
RequestMethod.PUT }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse assignStreamingCube(@PathVariable String
cubeName) {
streamingCoordinartorService.assignCube(cubeName);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/cubes/{cubeName}/unAssign", method = {
RequestMethod.PUT })
+ @RequestMapping(value = "/cubes/{cubeName}/unAssign", method = {
RequestMethod.PUT }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse unAssignStreamingCube(@PathVariable String
cubeName) {
streamingCoordinartorService.unAssignCube(cubeName);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/cubes/{cubeName}/reAssign", method = {
RequestMethod.POST })
+ @RequestMapping(value = "/cubes/{cubeName}/reAssign", method = {
RequestMethod.POST }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse reAssignStreamingCube(@PathVariable String
cubeName,
@RequestBody CubeAssignment newAssignments) {
@@ -122,49 +125,55 @@ public class StreamingCoordinatorController extends
BasicController {
return new CoordinatorResponse();
}
- @RequestMapping(value = "/replicaSet", method = { RequestMethod.POST })
+ @RequestMapping(value = "/replicaSet", method = { RequestMethod.POST },
produces = { "application/json" })
@ResponseBody
public CoordinatorResponse createReplicaSet(@RequestBody ReplicaSet rs) {
streamingCoordinartorService.createReplicaSet(rs);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/replicaSet/{replicaSetID}", method = {
RequestMethod.DELETE })
+ @RequestMapping(value = "/replicaSet/{replicaSetID}", method = {
RequestMethod.DELETE }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse deleteReplicaSet(@PathVariable Integer
replicaSetID) {
streamingCoordinartorService.removeReplicaSet(replicaSetID);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method =
{ RequestMethod.PUT })
+ @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method =
{ RequestMethod.PUT }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse addNodeToReplicaSet(@PathVariable Integer
replicaSetID, @PathVariable String nodeID) {
streamingCoordinartorService.addNodeToReplicaSet(replicaSetID, nodeID);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method =
{ RequestMethod.DELETE })
+ @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method =
{ RequestMethod.DELETE }, produces = {
+ "application/json" })
@ResponseBody
- public CoordinatorResponse removeNodeFromReplicaSet(@PathVariable Integer
replicaSetID, @PathVariable String nodeID) {
+ public CoordinatorResponse removeNodeFromReplicaSet(@PathVariable Integer
replicaSetID,
+ @PathVariable String nodeID) {
streamingCoordinartorService.removeNodeFromReplicaSet(replicaSetID,
nodeID);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/cubes/{cubeName}/pauseConsume", method = {
RequestMethod.PUT })
+ @RequestMapping(value = "/cubes/{cubeName}/pauseConsume", method = {
RequestMethod.PUT }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse pauseCubeConsume(@PathVariable String cubeName)
{
streamingCoordinartorService.pauseConsumers(cubeName);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/cubes/{cubeName}/resumeConsume", method = {
RequestMethod.PUT })
+ @RequestMapping(value = "/cubes/{cubeName}/resumeConsume", method = {
RequestMethod.PUT }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse resumeCubeConsume(@PathVariable String
cubeName) {
streamingCoordinartorService.resumeConsumers(cubeName);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/remoteStoreComplete", method = {
RequestMethod.POST })
+ @RequestMapping(value = "/remoteStoreComplete", method = {
RequestMethod.POST }, produces = { "application/json" })
@ResponseBody
public CoordinatorResponse segmentRemoteStoreComplete(@RequestBody
RemoteStoreCompleteRequest request) {
Pair<Long, Long> segmentRange = new Pair<>(request.getSegmentStart(),
request.getSegmentEnd());
@@ -176,7 +185,8 @@ public class StreamingCoordinatorController extends
BasicController {
return new CoordinatorResponse();
}
- @RequestMapping(value = "/replicaSetLeaderChange", method = {
RequestMethod.POST })
+ @RequestMapping(value = "/replicaSetLeaderChange", method = {
RequestMethod.POST }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse replicaSetLeaderChange(@RequestBody
ReplicaSetLeaderChangeRequest request) {
logger.info("receive replicaSet leader change:" + request);
diff --git
a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
index ec13485..36ae3b3 100644
---
a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
+++
b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
@@ -103,8 +103,8 @@ public class HttpStreamDataSearchClient implements
IStreamDataSearchClient {
final ResponseResultSchema schema = new ResponseResultSchema(cubeDesc,
dimensions, metrics);
final StreamingTupleConverter tupleConverter = new
StreamingTupleConverter(schema, tupleInfo);
final RecordsSerializer recordsSerializer = new
RecordsSerializer(schema);
- final DataRequest dataRequest = createDataRequest(query.getQueryId(),
cube.getName(), minSegmentTime,
- tupleInfo, tupleFilter, dimensions, groups, metrics,
storagePushDownLimit, allowStorageAggregation);
+ final DataRequest dataRequest = createDataRequest(query.getQueryId(),
cube.getName(), minSegmentTime, tupleInfo,
+ tupleFilter, dimensions, groups, metrics,
storagePushDownLimit, allowStorageAggregation);
logger.info("Query-{}:send request to stream receivers",
query.getQueryId());
for (final ReplicaSet rs : replicaSetsOfCube) {
@@ -173,9 +173,8 @@ public class HttpStreamDataSearchClient implements
IStreamDataSearchClient {
return receivers.get((receiverNo + 1) % receiversSize);
}
- public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance
cube,
- StreamingTupleConverter tupleConverter, RecordsSerializer
recordsSerializer, Node receiver,
- TupleInfo tupleInfo) throws Exception {
+ public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance
cube, StreamingTupleConverter tupleConverter,
+ RecordsSerializer recordsSerializer, Node receiver, TupleInfo
tupleInfo) throws Exception {
String queryId = dataRequest.getQueryId();
logger.info("send query to receiver " + receiver + " with query id:" +
queryId);
String url = "http://" + receiver.getHost() + ":" + receiver.getPort()
+ "/kylin/api/data/query";
@@ -235,7 +234,7 @@ public class HttpStreamDataSearchClient implements
IStreamDataSearchClient {
}
request.setGroups(groupSet);
- request.setMetrics(metrics);
+ request.setMetrics(Lists.newArrayList(metrics));
return request;
}
diff --git
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
index bb16dd5..f218d9d 100644
---
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
+++
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
@@ -100,9 +100,14 @@ import com.google.common.collect.Sets;
import javax.annotation.Nullable;
/**
- *
- * Each Kylin Streaming cluster has a coordinator to handle generic
assignment, membership and streaming cube state management.
+ * <pre>
+ * Each Kylin streaming cluster has at least one coordinator processes/server,
coordinator
+ * server works as the master node of streaming cluster and handle generic
assignment,
+ * membership and streaming cube state management.
*
+ * When cluster have several coordinator processes, only the leader try to
answer coordinator client's
+ * request, others process will become standby/candidate, so single point of
failure will be eliminated.
+ * </pre>
*/
public class Coordinator implements CoordinatorClient {
private static final Logger logger =
LoggerFactory.getLogger(Coordinator.class);
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
index dd8b58a..07c9028 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
@@ -18,6 +18,7 @@
package org.apache.kylin.stream.core.model;
+import java.util.List;
import java.util.Set;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -30,7 +31,7 @@ public class DataRequest {
private String havingFilter;
private Set<String> dimensions; // what contains in Pair is <tableName,
columnName>
private Set<String> groups;
- private Set<FunctionDesc> metrics;
+ private List<FunctionDesc> metrics;
private int storagePushDownLimit = Integer.MAX_VALUE;
private boolean allowStorageAggregation;
@@ -78,11 +79,11 @@ public class DataRequest {
this.groups = groups;
}
- public Set<FunctionDesc> getMetrics() {
+ public List<FunctionDesc> getMetrics() {
return metrics;
}
- public void setMetrics(Set<FunctionDesc> metrics) {
+ public void setMetrics(List<FunctionDesc> metrics) {
this.metrics = metrics;
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
index 0e05eaf..537f5a4 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
@@ -149,6 +149,8 @@ public class StreamingSegmentManager implements Closeable {
}
activeSegments.put(segmentStart, segment);
+ // when current active segments exceed tolerance, some
unpredictable accident may happend,
+ // but is should be configurable or computed on the fly
if (activeSegments.size() > 12) {
logger.warn("Two many active segments, segments size = " +
activeSegments.keySet());
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java
index 1c75460..50c4ba6 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java
@@ -109,6 +109,16 @@ public class RestService {
HttpResponse response = httpClient.execute(request);
String msg = EntityUtils.toString(response.getEntity());
int code = response.getStatusLine().getStatusCode();
+ if (logger.isTraceEnabled()) {
+ String displayMessage;
+ if (msg.length() > 500) {
+ displayMessage = msg.substring(0, 500);
+ } else {
+ displayMessage = msg;
+ }
+ logger.trace("Send request: {}. And receive response[{}] which
lenght is {}, and content is {}.", code,
+ request.getRequestLine().toString(), msg.length(),
displayMessage);
+ }
if (code != 200)
throw new IOException("Invalid http response " + code + " when
send request: "
+ request.getURI().toString() + "\n" + msg);
diff --git
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index 481b746..c171561 100644
---
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -85,7 +85,7 @@ import
org.apache.kylin.stream.core.storage.columnar.ColumnarStoreCache;
import org.apache.kylin.stream.core.util.HDFSUtil;
import org.apache.kylin.stream.core.util.NamedThreadFactory;
import org.apache.kylin.stream.core.util.NodeUtil;
-import org.apache.kylin.stream.server.retention.PolicyInfo;
+import org.apache.kylin.stream.server.retention.RetentionPolicyInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,6 +109,9 @@ public class StreamingServer implements
ReplicaSetLeaderSelector.LeaderChangeLis
private StreamMetadataStore streamMetadataStore;
private Node currentNode;
private int replicaSetID = -1;
+ /**
+ * indicate whether current receiver is the leader of whole replica set
+ */
private volatile boolean isLeader = false;
private ScheduledExecutorService segmentStateCheckerExecutor;
@@ -155,21 +158,21 @@ public class StreamingServer implements
ReplicaSetLeaderSelector.LeaderChangeLis
CubeInstance cubeInstance =
segmentManager.getCubeInstance();
String cubeName = cubeInstance.getName();
try {
- PolicyInfo policyInfo = new PolicyInfo();
+ RetentionPolicyInfo retentionPolicyInfo = new
RetentionPolicyInfo();
String policyName =
cubeInstance.getConfig().getStreamingSegmentRetentionPolicy();
Map<String, String> policyProps =
cubeInstance.getConfig()
.getStreamingSegmentRetentionPolicyProperties(policyName);
- policyInfo.setName(policyName);
- policyInfo.setProperties(policyProps);
+ retentionPolicyInfo.setName(policyName);
+ retentionPolicyInfo.setProperties(policyProps);
//The returned segments that require remote persisted
are already sorted in ascending order by the segment start time
Collection<StreamingCubeSegment> segments =
segmentManager.getRequireRemotePersistSegments();
if (!segments.isEmpty()) {
logger.info("found cube {} segments:{} are
immutable, retention policy is: {}", cubeName,
- segments, policyInfo.getName());
+ segments, retentionPolicyInfo.getName());
} else {
continue;
}
- handleImmutableCubeSegments(cubeName, segmentManager,
segments, policyInfo);
+ handleImmutableCubeSegments(cubeName, segmentManager,
segments, retentionPolicyInfo);
} catch (Exception e) {
logger.error("error when handle cube:" + cubeName, e);
}
@@ -178,14 +181,21 @@ public class StreamingServer implements
ReplicaSetLeaderSelector.LeaderChangeLis
}, 60, 60, TimeUnit.SECONDS);
}
+ /**
+ * <pre>
+ * When segment status was changed to immutable, the leader of replica will
+ * try to upload local segment cache to remote, while the follower will
remove
+ * local segment cache.
+ * </pre>
+ */
private void handleImmutableCubeSegments(String cubeName,
StreamingSegmentManager segmentManager,
- Collection<StreamingCubeSegment> segments, PolicyInfo policyInfo)
throws Exception {
- if
(PolicyInfo.FULL_BUILD_POLICY.equalsIgnoreCase(policyInfo.getName())) {
+ Collection<StreamingCubeSegment> segments, RetentionPolicyInfo
retentionPolicyInfo) throws Exception {
+ if
(RetentionPolicyInfo.FULL_BUILD_POLICY.equalsIgnoreCase(retentionPolicyInfo.getName()))
{
if (isLeader) {
sendSegmentsToFullBuild(cubeName, segmentManager, segments);
}
} else {
- purgeSegments(cubeName, segments, policyInfo.getProperties());
+ purgeSegments(cubeName, segments,
retentionPolicyInfo.getProperties());
}
}
diff --git
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
index 2fdc218..45c6307 100644
---
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
+++
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
@@ -18,6 +18,7 @@
package org.apache.kylin.stream.server.rest.controller;
+import java.util.List;
import java.util.Set;
import org.apache.commons.codec.binary.Base64;
@@ -140,7 +141,7 @@ public class DataController extends BasicController {
}
}
- private Set<FunctionDesc> convertMetrics(CubeDesc cubeDesc,
Set<FunctionDesc> metrics) {
+ private Set<FunctionDesc> convertMetrics(CubeDesc cubeDesc,
List<FunctionDesc> metrics) {
Set<FunctionDesc> result = Sets.newHashSet();
for (FunctionDesc metric : metrics) {
result.add(findAggrFuncFromCubeDesc(cubeDesc, metric));
diff --git
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java
similarity index 87%
rename from
stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java
rename to
stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java
index 08b3a6c..0320032 100644
---
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java
+++
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java
@@ -22,8 +22,17 @@ import java.util.Map;
import com.google.common.collect.Maps;
-public class PolicyInfo {
+/**
+ * Retention policy for local segment cache
+ */
+public class RetentionPolicyInfo {
+ /**
+ * outdated data will be dropped
+ */
public static final String PURGE_POLICY = "purge";
+ /**
+ * when data become immutable, it will be presisted remotely
+ */
public static final String FULL_BUILD_POLICY = "fullBuild";
private String name;