Repository: eagle Updated Branches: refs/heads/master f20bf0eb3 -> ba79b5485
[EAGLE-1059] fix a bug in PolicyResource.java https://issues.apache.org/jira/browse/EAGLE-1059 Author: Zhao, Qingwen <[email protected]> Closes #965 from qingwen220/EAGLE-1059. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/ba79b548 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/ba79b548 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/ba79b548 Branch: refs/heads/master Commit: ba79b5485622b9de92373d612631d1b96a245287 Parents: f20bf0e Author: Zhao, Qingwen <[email protected]> Authored: Mon Aug 7 10:56:55 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Mon Aug 7 10:56:55 2017 +0800 ---------------------------------------------------------------------- .../impl/AlertBoltOutputCollectorWrapper.java | 38 ++++++++------ .../eagle/metadata/resource/PolicyResource.java | 24 +++++++++ .../metadata/utils/PolicyIdConversions.java | 20 ++++--- .../dev/partials/alert/policyPrototypes.html | 55 ++++++++++---------- .../app/dev/public/js/ctrls/alertEditCtrl.js | 37 +++++++------ 5 files changed, 103 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java index 606ddce..4c749f4 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java @@ -49,28 +49,32 @@ public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector { @Override public void emit(AlertStreamEvent event) { + if (event == null) { + return; + } + event.ensureAlertId(); Set<PublishPartition> clonedPublishPartitions = new HashSet<>(publishPartitions); for (PublishPartition publishPartition : clonedPublishPartitions) { // skip the publish partition which is not belong to this policy and also check streamId PublishPartition cloned = publishPartition.clone(); Optional.ofNullable(event) - .filter(x -> x != null - && x.getSchema() != null - && cloned.getPolicyId().equalsIgnoreCase(x.getPolicyId()) - && (cloned.getStreamId().equalsIgnoreCase(x.getSchema().getStreamId()) - || cloned.getStreamId().equalsIgnoreCase(Publishment.STREAM_NAME_DEFAULT))) - .ifPresent(x -> { - cloned.getColumns().stream() - .filter(y -> event.getSchema().getColumnIndex(y) >= 0 - && event.getSchema().getColumnIndex(y) < event.getSchema().getColumns().size()) - .map(y -> event.getData()[event.getSchema().getColumnIndex(y)]) - .filter(y -> y != null) - .forEach(y -> cloned.getColumnValues().add(y)); - synchronized (outputLock) { - streamContext.counter().incr("alert_count"); - delegate.emit(Arrays.asList(cloned, event)); - } - }); + .filter(x -> x != null + && x.getSchema() != null + && cloned.getPolicyId().equalsIgnoreCase(x.getPolicyId()) + && (cloned.getStreamId().equalsIgnoreCase(x.getSchema().getStreamId()) + || cloned.getStreamId().equalsIgnoreCase(Publishment.STREAM_NAME_DEFAULT))) + .ifPresent(x -> { + cloned.getColumns().stream() + .filter(y -> event.getSchema().getColumnIndex(y) >= 0 + && event.getSchema().getColumnIndex(y) < event.getSchema().getColumns().size()) + .map(y -> event.getData()[event.getSchema().getColumnIndex(y)]) + .filter(y -> y != null) + .forEach(y -> cloned.getColumnValues().add(y)); + synchronized (outputLock) { + streamContext.counter().incr("alert_count"); + delegate.emit(Arrays.asList(cloned, event)); + } + }); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java index d09da4b..9edfc3e 100644 --- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java +++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.inject.Inject; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult; import org.apache.eagle.alert.metadata.resource.OpResult; import org.apache.eagle.common.rest.RESTResponse; @@ -72,6 +73,7 @@ public class PolicyResource { Preconditions.checkNotNull(policyEntity.getAlertPublishmentIds(), "alert publisher list should not be null"); PolicyDefinition policyDefinition = policyEntity.getDefinition(); + checkOutputStream(policyDefinition.getInputStreams(), policyDefinition.getOutputStreams()); OpResult result = metadataResource.addPolicy(policyDefinition); if (result.code != 200) { throw new IllegalArgumentException(result.message); @@ -153,6 +155,7 @@ public class PolicyResource { private PolicyEntity importPolicyProto(PolicyEntity policyEntity) { PolicyDefinition policyDefinition = policyEntity.getDefinition(); + checkOutputStream(policyDefinition.getInputStreams(), policyDefinition.getOutputStreams()); List<String> inputStreamType = new ArrayList<>(); String newDefinition = policyDefinition.getDefinition().getValue(); for (String inputStream : policyDefinition.getInputStreams()) { @@ -164,6 +167,7 @@ public class PolicyResource { policyDefinition.getDefinition().setValue(newDefinition); policyDefinition.setName(PolicyIdConversions.parsePolicyId(policyDefinition.getSiteId(), policyDefinition.getName())); policyDefinition.setSiteId(null); + policyDefinition.getPartitionSpec().clear(); policyEntity.setDefinition(policyDefinition); return policyEntityService.createOrUpdatePolicyProto(policyEntity); } @@ -186,10 +190,19 @@ public class PolicyResource { policyDefinition.getDefinition().setValue(newDefinition); policyDefinition.setSiteId(site); policyDefinition.setName(PolicyIdConversions.generateUniquePolicyId(site, policyProto.getDefinition().getName())); + PolicyValidationResult validationResult = metadataResource.validatePolicy(policyDefinition); if (!validationResult.isSuccess() || validationResult.getException() != null) { throw new IllegalArgumentException(validationResult.getException()); } + + policyDefinition.getPartitionSpec().clear(); + for (StreamPartition sd : validationResult.getPolicyExecutionPlan().getStreamPartitions()) { + if (inputStreams.contains(sd.getStreamId())) { + policyDefinition.getPartitionSpec().add(sd); + } + } + OpResult result = metadataResource.addPolicy(policyDefinition); if (result.code != 200) { throw new IllegalArgumentException("fail to create policy: " + result.message); @@ -204,4 +217,15 @@ public class PolicyResource { return true; } + private void checkOutputStream(List<String> inputStreams, List<String> outputStreams) { + for (String inputStream : inputStreams) { + for (String outputStream : outputStreams) { + if (outputStream.contains(inputStream)) { + throw new IllegalArgumentException("OutputStream name should not contains string: " + inputStream + + ". Please rename your OutputStream name"); + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java index c9ccadc..2012a40 100644 --- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java +++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java @@ -22,17 +22,21 @@ import com.google.common.base.Preconditions; public class PolicyIdConversions { public static String generateUniquePolicyId(String siteId, String policyName) { - return String.format("%s_%s", policyName, siteId); + String subffix = String.format("_%s", siteId.toLowerCase()); + if (policyName.toLowerCase().endsWith(subffix)) { + return policyName; + } + return String.format("%s_%s", policyName, siteId.toLowerCase()); } - public static String parsePolicyId(String siteId, String generatedUniquePolicyId) { - String subffix = String.format("_%s", siteId); - if (generatedUniquePolicyId.endsWith(subffix)) { - int streamTypeIdLength = generatedUniquePolicyId.length() - subffix.length(); - Preconditions.checkArgument(streamTypeIdLength > 0, "Invalid policyId: " + generatedUniquePolicyId + ", policyId is empty"); - return generatedUniquePolicyId.substring(0, streamTypeIdLength); + public static String parsePolicyId(String siteId, String policyName) { + String subffix = String.format("_%s", siteId.toLowerCase()); + if (policyName.toLowerCase().endsWith(subffix)) { + int streamTypeIdLength = policyName.length() - subffix.length(); + Preconditions.checkArgument(streamTypeIdLength > 0, "Invalid policyId: " + policyName + ", policyId is empty"); + return policyName.substring(0, streamTypeIdLength); } else { - return generatedUniquePolicyId; + return policyName; } } } http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html b/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html index 9f22ce4..0798475 100644 --- a/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html +++ b/eagle-server/src/main/webapp/app/dev/partials/alert/policyPrototypes.html @@ -27,35 +27,36 @@ <div sort-table="prototypeList"> <table class="table table-bordered table-hover"> <thead> - <tr> - <th> - <input type="checkbox" ng-checked="getCheckedList().length === prototypeList.length && prototypeList.length !== 0" ng-click="doCheckAll()" /> - </th> - <th>Name</th> - <th>Definition</th> - <th>Publishers</th> - <th width="110">Operation</th> - </tr> + <tr> + <th> + <input type="checkbox" ng-checked="getCheckedList().length === prototypeList.length && prototypeList.length !== 0" ng-click="doCheckAll()" /> + </th> + <th>Name</th> + <th>Definition</th> + <!--<th>Publishers</th> --> + <th width="110">Operation</th> + </tr> </thead> <tbody> - <tr> - <td> - <input type="checkbox" ng-checked="checkedPrototypes[item.name]" ng-click="checkedPrototypes[item.name] = !checkedPrototypes[item.name]" /> - </td> - <td>{{item.name}}</td> - <td><pre>{{item.definition.definition.value}}</pre></td> - <td> - <ul class="no-margin"> - <li ng-repeat="publisher in item.alertPublishmentIds track by $index"> - {{publisher}} - </li> - </ul> - </td> - <td class="text-center"> - <button class="btn btn-xs btn-primary" ng-click="createPolicy([item])">Export</button> - <button class="btn btn-xs btn-danger" ng-click="deletePrototype(item)">Delete</button> - </td> - </tr> + <tr> + <td> + <input type="checkbox" ng-checked="checkedPrototypes[item.name]" ng-click="checkedPrototypes[item.name] = !checkedPrototypes[item.name]" /> + </td> + <td>{{item.name}}</td> + <td><pre>{{item.definition.definition.value}}</pre></td> + <!-- + <td> + <ul class="no-margin"> + <li ng-repeat="publisher in item.alertPublishmentIds track by $index"> + {{publisher}} + </li> + </ul> + </td> --> + <td class="text-center"> + <button class="btn btn-xs btn-primary" ng-click="createPolicy([item])">Export</button> + <button class="btn btn-xs btn-danger" ng-click="deletePrototype(item)">Delete</button> + </td> + </tr> </tbody> </table> </div> http://git-wip-us.apache.org/repos/asf/eagle/blob/ba79b548/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js index e4623b2..fc2b494 100644 --- a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js +++ b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js @@ -253,12 +253,12 @@ }; /*$scope.checkInputStream = function (streamId) { - if($scope.isInputStreamSelected(streamId)) { - $scope.policy.inputStreams = common.array.remove(streamId, $scope.policy.inputStreams); - } else { - $scope.policy.inputStreams.push(streamId); - } - };*/ + if($scope.isInputStreamSelected(streamId)) { + $scope.policy.inputStreams = common.array.remove(streamId, $scope.policy.inputStreams); + } else { + $scope.policy.inputStreams.push(streamId); + } + };*/ // ============================================================== // = Definition = @@ -558,7 +558,18 @@ }); } - policyPromise._then(function () { + policyPromise._then(function (res) { + var validate = res.data; + if (!validate.success) { + $.dialog({ + title: "OPS", + content: "Create policy failed: " + (res.data.message || res.data.errors) + }); + $scope.policyLock = false; + $scope.saveLock = false; + return; + } + console.log("Create policy success..."); $.dialog({ title: "Done", @@ -566,18 +577,6 @@ }, function () { $wrapState.go("policyDetail", {name: $scope.policy.name, siteId: $scope.policy.siteId}); }); - }, function (res) { - var errormsg = ""; - if(typeof res.data.message !== 'undefined') { - errormsg = res.data.message; - } else { - errormsg = res.data.errors; - } - $.dialog({ - title: "OPS", - content: "Create policy failed: " + errormsg - }); - $scope.policyLock = false; }); }, function (args) { $.dialog({
