http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java deleted file mode 100755 index 7a503e1..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java +++ /dev/null @@ -1,252 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter; - -import org.apache.eagle.alert.engine.sorter.impl.StreamSortedWindowInMapDB; -import org.apache.eagle.alert.engine.sorter.impl.StreamSortedWindowOnHeap; -import com.google.common.base.Preconditions; -import org.mapdb.DB; -import org.mapdb.DBMaker; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -/** - * ===== Benchmark Result Report =====<br/><br/> - * - * <p>Num. Operation Type Time<br/> - * ---- --------- ---- ----<br/> - * 1000 FlushTime DIRECT_MEMORY : 55<br/> - * 1000 FlushTime FILE_RAF : 63<br/> - * 1000 FlushTime MEMORY : 146<br/> - * 1000 FlushTime ONHEAP : 17<br/> - * 1000 InsertTime DIRECT_MEMORY : 68<br/> - * 1000 InsertTime FILE_RAF : 223<br/> - * 1000 InsertTime MEMORY : 273<br/> - * 1000 InsertTime ONHEAP : 20<br/> - * 10000 FlushTime DIRECT_MEMORY : 551<br/> - * 10000 FlushTime FILE_RAF : 668<br/> - * 10000 FlushTime MEMORY : 643<br/> - * 10000 FlushTime ONHEAP : 5<br/> - * 10000 InsertTime DIRECT_MEMORY : 446<br/> - * 10000 InsertTime FILE_RAF : 2095<br/> - * 10000 InsertTime MEMORY : 784<br/> - * 10000 InsertTime ONHEAP : 29<br/> - * 100000 FlushTime DIRECT_MEMORY : 6139<br/> - * 100000 FlushTime FILE_RAF : 6237<br/> - * 100000 FlushTime MEMORY : 6238<br/> - * 100000 FlushTime ONHEAP : 18<br/> - * 100000 InsertTime DIRECT_MEMORY : 4499<br/> - * 100000 InsertTime FILE_RAF : 22343<br/> - * 100000 InsertTime MEMORY : 4962<br/> - * 100000 InsertTime ONHEAP : 107<br/> - * 1000000 FlushTime DIRECT_MEMORY : 61356<br/> - * 1000000 FlushTime FILE_RAF : 63025<br/> - * 1000000 FlushTime MEMORY : 61380<br/> - * 1000000 FlushTime ONHEAP : 47<br/> - * 1000000 InsertTime DIRECT_MEMORY : 43637<br/> - * 1000000 InsertTime FILE_RAF : 464481<br/> - * 1000000 InsertTime MEMORY : 44367<br/> - * 1000000 InsertTime ONHEAP : 2040<br/> - * </p> - * @see StreamSortedWindowOnHeap - * @see org.mapdb.DBMaker - */ -public class StreamWindowRepository { - public enum StorageType { - /** - * Creates new in-memory database which stores all data on heap without serialization. - * This mode should be very fast, but data will affect Garbage PartitionedEventCollector the same way as traditional Java Collections. - */ - ONHEAP, - - /** - * Creates new in-memory database. Changes are lost after JVM exits. - * This option serializes data into {@code byte[]}, - * so they are not affected by Garbage PartitionedEventCollector. - */ - MEMORY, - - /** - * <p> - * Creates new in-memory database. Changes are lost after JVM exits. - * </p><p> - * This will use {@code DirectByteBuffer} outside of HEAP, so Garbage Collector is not affected - * You should increase ammount of direct memory with - * {@code -XX:MaxDirectMemorySize=10G} JVM param - * </p> - */ - DIRECT_MEMORY, - - /** - * By default use File.createTempFile("streamwindows","temp") - */ - FILE_RAF - } - - private static final Logger LOG = LoggerFactory.getLogger(StreamWindowRepository.class); - private final Map<StorageType, DB> dbPool; - - private StreamWindowRepository() { - dbPool = new HashMap<>(); - } - - private static StreamWindowRepository repository; - - /** - * Close automatically when JVM exists. - * - * @return StreamWindowRepository singletonInstance - */ - public static StreamWindowRepository getSingletonInstance() { - synchronized (StreamWindowRepository.class) { - if (repository == null) { - repository = new StreamWindowRepository(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - repository.close(); - } - }); - } - return repository; - } - } - - private DB createMapDB(StorageType storageType) { - synchronized (dbPool) { - if (!dbPool.containsKey(storageType)) { - DB db; - switch (storageType) { - case ONHEAP: - db = DBMaker.heapDB().closeOnJvmShutdown().make(); - LOG.info("Create ONHEAP mapdb"); - break; - case MEMORY: - db = DBMaker.memoryDB().closeOnJvmShutdown().make(); - LOG.info("Create MEMORY mapdb"); - break; - case DIRECT_MEMORY: - db = DBMaker.memoryDirectDB().closeOnJvmShutdown().make(); - LOG.info("Create DIRECT_MEMORY mapdb"); - break; - case FILE_RAF: - try { - File file = File.createTempFile("window-", ".map"); - file.delete(); - file.deleteOnExit(); - Preconditions.checkNotNull(file, "file is null"); - db = DBMaker.fileDB(file).deleteFilesAfterClose().make(); - LOG.info("Created FILE_RAF map file at {}", file.getAbsolutePath()); - } catch (IOException e) { - throw new IllegalStateException(e); - } - break; - default: - throw new IllegalArgumentException("Illegal storage type: " + storageType); - } - dbPool.put(storageType, db); - return db; - } - return dbPool.get(storageType); - } - } - - public StreamWindow createWindow(long start, long end, long margin, StorageType type) { - StreamWindow ret; - switch (type) { - case ONHEAP: - ret = new StreamSortedWindowOnHeap(start, end, margin); - break; - default: - ret = new StreamSortedWindowInMapDB( - start, end, margin, - createMapDB(type), - UUID.randomUUID().toString() - ); - break; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Created new {}, type: {}", ret, type); - } - return ret; - } - - public StreamWindow createWindow(long start, long end, long margin, StreamWindowStrategy strategy) { - return strategy.createWindow(start, end, margin, this); - } - - public StreamWindow createWindow(long start, long end, long margin) { - return OnHeapStrategy.INSTANCE.createWindow(start, end, margin, this); - } - - public void close() { - for (Map.Entry<StorageType, DB> entry : dbPool.entrySet()) { - entry.getValue().close(); - } - dbPool.clear(); - } - - public interface StreamWindowStrategy { - StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository); - } - - public static class OnHeapStrategy implements StreamWindowStrategy { - public static final OnHeapStrategy INSTANCE = new OnHeapStrategy(); - - @Override - public StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository) { - return repository.createWindow(start, end, margin, StorageType.ONHEAP); - } - } - - public static class WindowSizeStrategy implements StreamWindowStrategy { - private static final long ONE_HOUR = 3600 * 1000; - private static final long FIVE_HOURS = 5 * 3600 * 1000; - private final long onheapWindowSizeLimit; - private final long offheapWindowSizeLimit; - - public static WindowSizeStrategy INSTANCE = new WindowSizeStrategy(ONE_HOUR, FIVE_HOURS); - - public WindowSizeStrategy(long onheapWindowSizeLimit, long offheapWindowSizeLimit) { - this.offheapWindowSizeLimit = offheapWindowSizeLimit; - this.onheapWindowSizeLimit = onheapWindowSizeLimit; - - if (this.offheapWindowSizeLimit < this.onheapWindowSizeLimit) { - throw new IllegalStateException("offheapWindowSizeLimit " + this.offheapWindowSizeLimit + " < onheapWindowSizeLimit " + this.onheapWindowSizeLimit); - } - } - - @Override - public StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository) { - long windowLength = end - start; - if (windowLength <= onheapWindowSizeLimit) { - return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.ONHEAP); - } else if (windowLength > onheapWindowSizeLimit & windowLength <= offheapWindowSizeLimit) { - return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.DIRECT_MEMORY); - } else { - return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.FILE_RAF); - } - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java deleted file mode 100644 index 73adee6..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.utils.SerializableUtils; -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.jetbrains.annotations.NotNull; -import org.mapdb.DataInput2; -import org.mapdb.DataOutput2; -import org.mapdb.serializer.GroupSerializerObjectArray; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * @deprecated performance is worse, should investigate. - */ -public class CachedEventGroupSerializer extends GroupSerializerObjectArray<PartitionedEvent[]> { - private Map<Integer, StreamPartition> hashCodePartitionDict = new HashMap<>(); - - private void writePartitionedEvent(DataOutput2 out, PartitionedEvent event) throws IOException { - out.packLong(event.getPartitionKey()); - int partitionHashCode = 0; - if (event.getPartition() != null) { - partitionHashCode = event.getPartition().hashCode(); - if (!hashCodePartitionDict.containsKey(partitionHashCode)) { - hashCodePartitionDict.put(partitionHashCode, event.getPartition()); - } - } - out.packInt(partitionHashCode); - if (event.getEvent() != null) { - byte[] eventBytes = SerializableUtils.serializeToCompressedByteArray(event.getEvent()); - out.packInt(eventBytes.length); - out.write(eventBytes); - } else { - out.packInt(0); - } - } - - private PartitionedEvent readPartitionedEvent(DataInput2 in) throws IOException { - PartitionedEvent event = new PartitionedEvent(); - event.setPartitionKey(in.unpackLong()); - int partitionHashCode = in.unpackInt(); - if (partitionHashCode != 0 && hashCodePartitionDict.containsKey(partitionHashCode)) { - event.setPartition(hashCodePartitionDict.get(partitionHashCode)); - } - int eventBytesLen = in.unpackInt(); - if (eventBytesLen > 0) { - byte[] eventBytes = new byte[eventBytesLen]; - in.readFully(eventBytes); - event.setEvent((StreamEvent) SerializableUtils.deserializeFromCompressedByteArray(eventBytes, "Deserialize event from bytes")); - } - return event; - } - - @Override - public void serialize(DataOutput2 out, PartitionedEvent[] value) throws IOException { - out.packInt(value.length); - for (PartitionedEvent event : value) { - writePartitionedEvent(out, event); - } - } - - @Override - public PartitionedEvent[] deserialize(DataInput2 in, int available) throws IOException { - final int size = in.unpackInt(); - PartitionedEvent[] ret = new PartitionedEvent[size]; - for (int i = 0; i < size; i++) { - ret[i] = readPartitionedEvent(in); - } - return ret; - } - - @Override - public boolean isTrusted() { - return true; - } - - @Override - public boolean equals(PartitionedEvent[] a1, PartitionedEvent[] a2) { - return a1[0].getTimestamp() == a2[0].getTimestamp(); - } - - @Override - public int hashCode(@NotNull PartitionedEvent[] events, int seed) { - return new HashCodeBuilder().append(events).toHashCode(); - } - - @Override - public int compare(PartitionedEvent[] o1, PartitionedEvent[] o2) { - if (o1.length > 0 && o2.length > 0) { - return (int) (o1[0].getTimestamp() - o2[0].getTimestamp()); - } else { - return 0; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java deleted file mode 100644 index 55efcaf..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.utils.SerializableUtils; -import org.jetbrains.annotations.NotNull; -import org.mapdb.DataInput2; -import org.mapdb.DataOutput2; -import org.mapdb.Serializer; -import org.mapdb.serializer.GroupSerializer; - -import java.io.IOException; -import java.util.Comparator; - - -public class PartitionedEventGroupSerializer implements GroupSerializer<PartitionedEvent[]> { - private static final GroupSerializer<byte[]> delegate = Serializer.BYTE_ARRAY; - - @Override - public int valueArraySearch(Object keys, PartitionedEvent[] key) { - return delegate.valueArraySearch(keys, serialize(key)); - } - - @SuppressWarnings("rawtypes") - @Override - public int valueArraySearch(Object keys, PartitionedEvent[] key, Comparator comparator) { - return delegate.valueArraySearch(keys, serialize(key), comparator); - } - - @Override - public void valueArraySerialize(DataOutput2 out, Object vals) throws IOException { - delegate.valueArraySerialize(out, vals); - } - - @Override - public Object valueArrayDeserialize(DataInput2 in, int size) throws IOException { - return delegate.valueArrayDeserialize(in, size); - } - - @Override - public PartitionedEvent[] valueArrayGet(Object vals, int pos) { - return deserialize(delegate.valueArrayGet(vals, pos)); - } - - @Override - public int valueArraySize(Object vals) { - return delegate.valueArraySize(vals); - } - - @Override - public Object valueArrayEmpty() { - return delegate.valueArrayEmpty(); - } - - @Override - public Object valueArrayPut(Object vals, int pos, PartitionedEvent[] newValue) { - return delegate.valueArrayPut(vals, pos, serialize(newValue)); - } - - @Override - public Object valueArrayUpdateVal(Object vals, int pos, PartitionedEvent[] newValue) { - return delegate.valueArrayUpdateVal(vals, pos, serialize(newValue)); - } - - @Override - public Object valueArrayFromArray(Object[] objects) { - return delegate.valueArrayFromArray(objects); - } - - @Override - public Object valueArrayCopyOfRange(Object vals, int from, int to) { - return delegate.valueArrayCopyOfRange(vals, from, to); - } - - @Override - public Object valueArrayDeleteValue(Object vals, int pos) { - return delegate.valueArrayDeleteValue(vals, pos); - } - - @Override - public void serialize(@NotNull DataOutput2 out, @NotNull PartitionedEvent[] value) throws IOException { - delegate.serialize(out, serialize(value)); - } - - private static byte[] serialize(PartitionedEvent[] events) { - return SerializableUtils.serializeToCompressedByteArray(events); - } - - @Override - public PartitionedEvent[] deserialize(@NotNull DataInput2 input, int available) throws IOException { - return deserialize(delegate.deserialize(input, available)); - } - - private static PartitionedEvent[] deserialize(byte[] bytes) { - return (PartitionedEvent[]) SerializableUtils.deserializeFromCompressedByteArray(bytes, "deserialize as stream event"); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java deleted file mode 100644 index 5378c67..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import org.apache.eagle.alert.engine.model.PartitionedEvent; - -import java.util.Comparator; -import java.util.Objects; - -/** - * TODO: Stable sorting algorithm for better performance to avoid event resorting with same timestamp?. - */ -public class PartitionedEventTimeOrderingComparator implements Comparator<PartitionedEvent> { - public static final PartitionedEventTimeOrderingComparator INSTANCE = new PartitionedEventTimeOrderingComparator(); - - @Override - public int compare(PartitionedEvent o1, PartitionedEvent o2) { - if (Objects.equals(o1, o2)) { - return 0; - } else { - if (o1 == null && o2 == null) { - return 0; - } else if (o1 != null && o2 == null) { - return 1; - } else if (o1 == null) { - return -1; - } - // Unstable Sorting Algorithm - if (o1.getTimestamp() <= o2.getTimestamp()) { - return -1; - } else { - return 1; - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java deleted file mode 100644 index fb5ba72..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.router.StreamSortHandler; -import org.apache.eagle.alert.engine.sorter.StreamTimeClock; -import org.apache.eagle.alert.engine.sorter.StreamWindow; -import org.apache.eagle.alert.engine.sorter.StreamWindowManager; -import org.apache.eagle.common.DateTimeUtil; - -import org.joda.time.Period; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class StreamSortWindowHandlerImpl implements StreamSortHandler { - private static final Logger LOG = LoggerFactory.getLogger(StreamSortWindowHandlerImpl.class); - private StreamWindowManager windowManager; - private StreamSortSpec streamSortSpecSpec; - private PartitionedEventCollector outputCollector; - private String streamId; - - public void prepare(String streamId, StreamSortSpec streamSortSpecSpec, PartitionedEventCollector outputCollector) { - this.windowManager = new StreamWindowManagerImpl( - Period.parse(streamSortSpecSpec.getWindowPeriod()), - streamSortSpecSpec.getWindowMargin(), - PartitionedEventTimeOrderingComparator.INSTANCE, - outputCollector); - this.streamSortSpecSpec = streamSortSpecSpec; - this.streamId = streamId; - this.outputCollector = outputCollector; - } - - /** - * Entry point to manage window lifecycle. - * - * @param event StreamEvent - */ - public void nextEvent(PartitionedEvent event) { - final long eventTime = event.getEvent().getTimestamp(); - boolean handled = false; - - synchronized (this.windowManager) { - for (StreamWindow window : this.windowManager.getWindows()) { - if (window.alive() && window.add(event)) { - handled = true; - } - } - - // No window found for the event but not too late being rejected - if (!handled && !windowManager.reject(eventTime)) { - // later then all events, create later window - StreamWindow window = windowManager.addNewWindow(eventTime); - if (window.add(event)) { - LOG.info("Created {} of {} at {}", window, this.streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(eventTime)); - handled = true; - } - } - } - - if (!handled) { - if (LOG.isDebugEnabled()) { - LOG.debug("Drop expired event {}", event); - } - outputCollector.drop(event); - } - } - - @Override - public void onTick(StreamTimeClock clock, long globalSystemTime) { - windowManager.onTick(clock, globalSystemTime); - } - - @Override - public void close() { - try { - windowManager.close(); - } catch (IOException e) { - LOG.error("Got exception while closing window manager", e); - } - } - - @Override - public String toString() { - return super.toString(); - } - - @Override - public int hashCode() { - if (streamSortSpecSpec == null) { - throw new NullPointerException("streamSortSpec is null"); - } else { - return streamSortSpecSpec.hashCode(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java deleted file mode 100644 index 73a63b4..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.sorter.BaseStreamWindow; -import org.apache.commons.lang3.time.StopWatch; -import org.mapdb.BTreeMap; -import org.mapdb.DB; -import org.mapdb.Serializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * StreamSortedWindow based on MapDB to support off-heap or disk storage. - * Stable sorting algorithm - * See <a href="http://www.mapdb.org">http://www.mapdb.org</a> - */ -public class StreamSortedWindowInMapDB extends BaseStreamWindow { - private final String mapId; - private BTreeMap<Long, PartitionedEvent[]> btreeMap; - private static final Logger LOG = LoggerFactory.getLogger(StreamSortedWindowInMapDB.class); - private final AtomicInteger size; - private long replaceOpCount = 0; - private static final PartitionedEventGroupSerializer STREAM_EVENT_GROUP_SERIALIZER = new PartitionedEventGroupSerializer(); - - /** - * @param mapId physical map id, used to decide whether to reuse or not. - */ - @SuppressWarnings("unused") - public StreamSortedWindowInMapDB(long start, long end, long margin, DB db, String mapId) { - super(start, end, margin); - this.mapId = mapId; - try { - btreeMap = db.<Long, StreamEvent>treeMap(mapId) - .keySerializer(Serializer.LONG) - .valueSerializer(STREAM_EVENT_GROUP_SERIALIZER) - .createOrOpen(); - LOG.debug("Created BTree map {}", mapId); - } catch (Error error) { - LOG.info("Failed create BTree {}", mapId, error); - } - size = new AtomicInteger(0); - } - - /** - * Assumed: most of adding operation will do putting only and few require replacing. - * <ol> - * <li> - * First of all, always try to put with created event directly - * </li> - * <li> - * If not absent (key already exists), then append and replace, - * replace operation will cause more consumption - * </li> - * </ol> - * - * @param event coming-in event - * @return whether success - */ - @Override - public synchronized boolean add(PartitionedEvent event) { - long timestamp = event.getEvent().getTimestamp(); - if (accept(timestamp)) { - boolean absent = btreeMap.putIfAbsentBoolean(timestamp, new PartitionedEvent[] {event}); - if (!absent) { - size.incrementAndGet(); - return true; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Duplicated timestamp {}, will reduce performance as replacing", timestamp); - } - PartitionedEvent[] oldValue = btreeMap.get(timestamp); - PartitionedEvent[] newValue = oldValue == null ? new PartitionedEvent[1] : Arrays.copyOf(oldValue, oldValue.length + 1); - newValue[newValue.length - 1] = event; - PartitionedEvent[] removedValue = btreeMap.replace(timestamp, newValue); - replaceOpCount++; - if (replaceOpCount % 1000 == 0) { - LOG.warn("Too many events ({}) with overlap timestamp, may reduce insertion performance", replaceOpCount); - } - if (removedValue != null) { - size.incrementAndGet(); - } else { - throw new IllegalStateException("Failed to replace key " + timestamp + " with " + newValue.length + " entities array to replace old " + oldValue.length + " entities array"); - } - return true; - } - } else { - return false; - } - } - - @Override - protected synchronized void flush(PartitionedEventCollector collector) { - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - btreeMap.valueIterator().forEachRemaining((events) -> { - for (PartitionedEvent event : events) { - collector.emit(event); - } - }); - btreeMap.clear(); - replaceOpCount = 0; - stopWatch.stop(); - LOG.info("Flushed {} events in {} ms", size, stopWatch.getTime()); - size.set(0); - } - - @Override - public synchronized void close() { - super.close(); - btreeMap.close(); - LOG.info("Closed {}", this.mapId); - } - - @Override - public synchronized int size() { - return size.get(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java deleted file mode 100644 index ed000f1..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.sorter.BaseStreamWindow; - -import com.google.common.collect.TreeMultiset; -import org.apache.commons.lang3.time.StopWatch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Comparator; - -public class StreamSortedWindowOnHeap extends BaseStreamWindow { - private static final Logger LOG = LoggerFactory.getLogger(StreamSortedWindowOnHeap.class); - private final TreeMultiset<PartitionedEvent> treeMultisetCache; - - /** - * @param start start time. - * @param end end time. - * @param margin margin time. - */ - public StreamSortedWindowOnHeap(long start, long end, long margin, Comparator<PartitionedEvent> comparator) { - super(start, end, margin); - treeMultisetCache = TreeMultiset.create(comparator); - } - - public StreamSortedWindowOnHeap(long start, long end, long margin) { - this(start, end, margin, new PartitionedEventTimeOrderingComparator()); - } - - @Override - public boolean add(PartitionedEvent partitionedEvent) { - synchronized (treeMultisetCache) { - if (accept(partitionedEvent.getEvent().getTimestamp())) { - treeMultisetCache.add(partitionedEvent); - return true; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("{} is not acceptable, ignored", partitionedEvent); - } - return false; - } - } - } - - @Override - protected void flush(PartitionedEventCollector collector) { - synchronized (treeMultisetCache) { - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - treeMultisetCache.forEach(collector::emit); - int size = treeMultisetCache.size(); - treeMultisetCache.clear(); - stopWatch.stop(); - LOG.info("Flushed {} events in {} ms from {}", size, stopWatch.getTime(), this.toString()); - } - } - - @Override - public int size() { - synchronized (treeMultisetCache) { - return treeMultisetCache.size(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java deleted file mode 100644 index e5be786..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import org.apache.eagle.alert.engine.sorter.StreamTimeClock; -import org.apache.eagle.common.DateTimeUtil; - -import java.util.concurrent.atomic.AtomicLong; - - -/** - * In memory thread-safe time clock service. - * TODO: maybe need to synchronize time clock globally, how to? - */ -public class StreamTimeClockInLocalMemory implements StreamTimeClock { - private final AtomicLong currentTime; - private final String streamId; - - public StreamTimeClockInLocalMemory(String streamId, long initialTime) { - this.streamId = streamId; - this.currentTime = new AtomicLong(initialTime); - } - - public StreamTimeClockInLocalMemory(String streamId) { - this(streamId, 0L); - } - - @Override - public void moveForward(long timestamp) { - if (timestamp < currentTime.get()) { - throw new IllegalArgumentException(timestamp + " < " + currentTime.get() + ", should not move time back"); - } - this.currentTime.set(timestamp); - } - - @Override - public String getStreamId() { - return streamId; - } - - @Override - public long getTime() { - return currentTime.get(); - } - - @Override - public String toString() { - return String.format("StreamClock[streamId=%s, now=%s]", streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(currentTime.get())); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java deleted file mode 100644 index b59918d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import org.apache.eagle.alert.engine.sorter.StreamTimeClock; -import org.apache.eagle.alert.engine.sorter.StreamTimeClockListener; -import org.apache.eagle.alert.engine.sorter.StreamTimeClockManager; -import org.apache.eagle.common.DateTimeUtil; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.atomic.AtomicInteger; - -public final class StreamTimeClockManagerImpl implements StreamTimeClockManager { - private static final long serialVersionUID = -2770823821511195343L; - private static final Logger LOG = LoggerFactory.getLogger(StreamTimeClockManagerImpl.class); - private final Map<String, StreamTimeClock> streamIdTimeClockMap; - private Timer timer; - - private final Map<StreamTimeClockListener, String> listenerStreamIdMap; - private static final AtomicInteger num = new AtomicInteger(); - - public StreamTimeClockManagerImpl() { - listenerStreamIdMap = new HashMap<>(); - streamIdTimeClockMap = new HashMap<>(); - timer = new Timer("StreamScheduler-" + num.getAndIncrement()); - timer.schedule(new TimerTask() { - @Override - public void run() { - // Make sure the timer tick happens one by one - triggerTickOnAll(); - } - }, 1000, 1000); - } - - /** - * By default, we could keep the current time clock in memory, - * Eventually we may need to consider the global time synchronization across all nodes - * 1) When to initialize window according to start time - * 2) When to close expired window according to current time - * - * @return StreamTimeClock instance. - */ - @Override - public StreamTimeClock createStreamTimeClock(String streamId) { - synchronized (streamIdTimeClockMap) { - if (!streamIdTimeClockMap.containsKey(streamId)) { - StreamTimeClock instance = new StreamTimeClockInLocalMemory(streamId); - LOG.info("Created {}", instance); - streamIdTimeClockMap.put(streamId, instance); - } else { - LOG.warn("TimeClock for stream already existss: " + streamIdTimeClockMap.get(streamId)); - } - return streamIdTimeClockMap.get(streamId); - } - } - - @Override - public StreamTimeClock getStreamTimeClock(String streamId) { - synchronized (streamIdTimeClockMap) { - if (!streamIdTimeClockMap.containsKey(streamId)) { - LOG.warn("TimeClock for stream {} is not initialized before being called, create now", streamId); - return createStreamTimeClock(streamId); - } - return streamIdTimeClockMap.get(streamId); - } - } - - @Override - public void removeStreamTimeClock(String streamId) { - synchronized (streamIdTimeClockMap) { - if (streamIdTimeClockMap.containsKey(streamId)) { - streamIdTimeClockMap.remove(streamId); - LOG.info("Removed TimeClock for stream {}: {}", streamId, streamIdTimeClockMap.get(streamId)); - } else { - LOG.warn("No TimeClock found for stream {}, nothing to remove", streamId); - } - } - } - - @Override - public void registerListener(String streamId, StreamTimeClockListener listener) { - synchronized (listenerStreamIdMap) { - if (listenerStreamIdMap.containsKey(listener)) { - throw new IllegalArgumentException("Duplicated listener: " + listener.toString()); - } - LOG.info("Register {} on {}", listener, streamId); - listenerStreamIdMap.put(listener, streamId); - } - } - - @Override - public void registerListener(StreamTimeClock streamClock, StreamTimeClockListener listener) { - registerListener(streamClock.getStreamId(), listener); - } - - @Override - public void removeListener(StreamTimeClockListener listener) { - listenerStreamIdMap.remove(listener); - } - - @Override - public synchronized void triggerTickOn(String streamId) { - int count = 0; - for (Map.Entry<StreamTimeClockListener, String> entry : listenerStreamIdMap.entrySet()) { - if (entry.getValue().equals(streamId)) { - entry.getKey().onTick(streamIdTimeClockMap.get(streamId), getCurrentSystemTime()); - count++; - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Triggered {} time-clock listeners on stream {}", count, streamId); - } - } - - private static long getCurrentSystemTime() { - return System.currentTimeMillis(); - } - - @Override - public void onTimeUpdate(String streamId, long timestamp) { - StreamTimeClock timeClock = getStreamTimeClock(streamId); - if (timeClock == null) { - return; - } - // Trigger time clock only when time moves forward - if (timestamp >= timeClock.getTime()) { - timeClock.moveForward(timestamp); - if (LOG.isDebugEnabled()) { - LOG.debug("Tick on stream {} with latest time {}", streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timeClock.getTime())); - } - triggerTickOn(streamId); - } - } - - private void triggerTickOnAll() { - synchronized (listenerStreamIdMap) { - for (Map.Entry<StreamTimeClockListener, String> entry : listenerStreamIdMap.entrySet()) { - triggerTickOn(entry.getValue()); - } - } - } - - @Override - public void close() { - timer.cancel(); - triggerTickOnAll(); - LOG.info("Closed StreamTimeClockManager {}", this); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java deleted file mode 100644 index 4c5154b..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter.impl; - -import org.apache.eagle.alert.engine.PartitionedEventCollector; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.sorter.StreamTimeClock; -import org.apache.eagle.alert.engine.sorter.StreamWindow; -import org.apache.eagle.alert.engine.sorter.StreamWindowManager; -import org.apache.eagle.alert.engine.sorter.StreamWindowRepository; -import org.apache.eagle.alert.utils.TimePeriodUtils; -import org.apache.eagle.common.DateTimeUtil; - -import org.apache.commons.lang3.time.StopWatch; -import org.joda.time.Period; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -public class StreamWindowManagerImpl implements StreamWindowManager { - private static final Logger LOG = LoggerFactory.getLogger(StreamWindowManagerImpl.class); - private final TreeMap<Long, StreamWindow> windowBuckets; - private final PartitionedEventCollector collector; - private final Period windowPeriod; - private final long windowMargin; - @SuppressWarnings("unused") - private final Comparator<PartitionedEvent> comparator; - private long rejectTime; - - public StreamWindowManagerImpl(Period windowPeriod, long windowMargin, Comparator<PartitionedEvent> comparator, PartitionedEventCollector collector) { - this.windowBuckets = new TreeMap<>(); - this.windowPeriod = windowPeriod; - this.windowMargin = windowMargin; - this.collector = collector; - this.comparator = comparator; - } - - @Override - public StreamWindow addNewWindow(long initialTime) { - synchronized (windowBuckets) { - if (!reject(initialTime)) { - Long windowStartTime = TimePeriodUtils.formatMillisecondsByPeriod(initialTime, windowPeriod); - Long windowEndTime = windowStartTime + TimePeriodUtils.getMillisecondsOfPeriod(windowPeriod); - StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(windowStartTime, windowEndTime, windowMargin); - window.register(collector); - addWindow(window); - return window; - } else { - throw new IllegalStateException("Failed to create new window, as " - + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(initialTime) + " is too late, only allow timestamp after " - + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(rejectTime)); - } - } - } - - private void addWindow(StreamWindow window) { - if (!windowBuckets.containsKey(window.startTime())) { - windowBuckets.put(window.startTime(), window); - } else { - throw new IllegalArgumentException("Duplicated " + window.toString()); - } - } - - @Override - public void removeWindow(StreamWindow window) { - synchronized (windowBuckets) { - windowBuckets.remove(window.startTime()); - } - } - - @Override - public boolean hasWindow(StreamWindow window) { - synchronized (windowBuckets) { - return windowBuckets.containsKey(window.startTime()); - } - } - - @Override - public boolean hasWindowFor(long timestamp) { - return getWindowFor(timestamp) != null; - } - - @Override - public Collection<StreamWindow> getWindows() { - synchronized (windowBuckets) { - return windowBuckets.values(); - } - } - - @Override - public StreamWindow getWindowFor(long timestamp) { - synchronized (windowBuckets) { - for (StreamWindow windowBucket : windowBuckets.values()) { - if (timestamp >= windowBucket.startTime() && timestamp < windowBucket.endTime()) { - return windowBucket; - } - } - return null; - } - } - - @Override - public boolean reject(long timestamp) { - return timestamp < rejectTime; - } - - @Override - public void onTick(StreamTimeClock clock, long globalSystemTime) { - synchronized (windowBuckets) { - List<StreamWindow> toRemoved = new ArrayList<>(); - List<StreamWindow> aliveWindow = new ArrayList<>(); - - for (StreamWindow windowBucket : windowBuckets.values()) { - windowBucket.onTick(clock, globalSystemTime); - if (windowBucket.rejectTime() > rejectTime) { - rejectTime = windowBucket.rejectTime(); - } - } - for (StreamWindow windowBucket : windowBuckets.values()) { - if (windowBucket.expired() || windowBucket.endTime() <= rejectTime) { - toRemoved.add(windowBucket); - } else { - aliveWindow.add(windowBucket); - } - } - toRemoved.forEach(this::closeAndRemoveWindow); - if (toRemoved.size() > 0) { - LOG.info("Windows: {} alive = {}, {} expired = {}", aliveWindow.size(), aliveWindow, toRemoved.size(), toRemoved); - } - } - } - - private void closeAndRemoveWindow(StreamWindow windowBucket) { - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - closeWindow(windowBucket); - removeWindow(windowBucket); - stopWatch.stop(); - LOG.info("Removed {} in {} ms", windowBucket, stopWatch.getTime()); - } - - private void closeWindow(StreamWindow windowBucket) { - windowBucket.close(); - } - - public void close() { - synchronized (windowBuckets) { - LOG.debug("Closing"); - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - int count = 0; - for (StreamWindow windowBucket : getWindows()) { - count++; - closeWindow(windowBucket); - } - windowBuckets.clear(); - stopWatch.stop(); - LOG.info("Closed {} windows in {} ms", count, stopWatch.getTime()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java deleted file mode 100644 index e9ee892..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java +++ /dev/null @@ -1,411 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.spout; - -import backtype.storm.spout.MultiScheme; -import backtype.storm.spout.Scheme; -import backtype.storm.spout.SchemeAsMultiScheme; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.commons.collections.CollectionUtils; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.SpoutSpec; -import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; -import org.apache.eagle.alert.engine.coordinator.MetadataType; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.router.SpoutSpecListener; -import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer; -import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider; -import org.apache.eagle.alert.engine.serialization.Serializers; -import org.apache.eagle.alert.utils.AlertConstants; -import org.apache.eagle.alert.utils.StreamIdConversion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.*; - -import java.text.MessageFormat; -import java.util.*; - -/** - * wrap KafkaSpout to provide parallel processing of messages for multiple Kafka topics - * <p>1. onNewConfig() is interface for outside to update new metadata. Upon new metadata, this class will calculate if there is any new topic, removed topic or - * updated topic</p> - */ -public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener, SerializationMetadataProvider { - private static final long serialVersionUID = -5280723341236671580L; - private static final Logger LOG = LoggerFactory.getLogger(CorrelationSpout.class); - - public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT = "/consumers"; - public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH = "/eagle_consumer"; - - // topic to KafkaSpoutWrapper - private volatile Map<String, KafkaSpoutWrapper> kafkaSpoutList = new HashMap<>(); - private int numOfRouterBolts; - - private SpoutSpec cachedSpoutSpec; - - private transient KafkaSpoutMetric kafkaSpoutMetric; - - @SuppressWarnings("rawtypes") - private Map conf; - private TopologyContext context; - private SpoutOutputCollector collector; - private final Config config; - private String topologyId; - private String spoutName; - private String routeBoltName; - @SuppressWarnings("unused") - private int taskIndex; - private IMetadataChangeNotifyService changeNotifyService; - private PartitionedEventSerializer serializer; - private volatile Map<String, StreamDefinition> sds; - - /** - * FIXME one single changeNotifyService may have issues as possibly multiple spout tasks will register themselves and initialize service. - * - * @param config - * @param topologyId - * @param changeNotifyService - * @param numOfRouterBolts - */ - public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts) { - this(config, topologyId, changeNotifyService, numOfRouterBolts, AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME); - } - - /** - * @param config - * @param topologyId used for distinguishing kafka offset for different topologies - * @param numOfRouterBolts used for generating streamId and routing - * @param spoutName used for generating streamId between spout and router bolt - * @param routerBoltName used for generating streamId between spout and router bolt. - */ - public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts, String spoutName, String routerBoltName) { - this.config = config; - this.topologyId = topologyId; - this.changeNotifyService = changeNotifyService; - this.numOfRouterBolts = numOfRouterBolts; - this.spoutName = spoutName; - this.routeBoltName = routerBoltName; - } - - public String getSpoutName() { - return spoutName; - } - - public String getRouteBoltName() { - return routeBoltName; - } - - /** - * the only output field is for StreamEvent. - * - * @param declarer - */ - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for (int i = 0; i < numOfRouterBolts; i++) { - String streamId = StreamIdConversion.generateStreamIdBetween(spoutName, routeBoltName + i); - declarer.declareStream(streamId, new Fields(AlertConstants.FIELD_0)); - LOG.info("declare stream between spout and streamRouterBolt " + streamId); - } - } - - @SuppressWarnings("rawtypes") - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - if (LOG.isDebugEnabled()) { - LOG.debug("open method invoked"); - } - this.conf = conf; - this.context = context; - this.collector = collector; - this.taskIndex = context.getThisTaskIndex(); - - // initialize an empty SpoutSpec - cachedSpoutSpec = new SpoutSpec(topologyId, new HashMap<>(), new HashMap<>(), new HashMap<>()); - - changeNotifyService.registerListener(this); - changeNotifyService.init(config, MetadataType.SPOUT); - - // register KafkaSpout metric - kafkaSpoutMetric = new KafkaSpoutMetric(); - context.registerMetric("kafkaSpout", kafkaSpoutMetric, 60); - - this.serializer = Serializers.newPartitionedEventSerializer(this); - } - - @Override - public void onSpoutSpecChange(SpoutSpec spec, Map<String, StreamDefinition> sds) { - LOG.info("new metadata is updated " + spec); - try { - onReload(spec, sds); - } catch (Exception ex) { - LOG.error("error applying new SpoutSpec", ex); - } - } - - @Override - public void nextTuple() { - for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) { - try { - wrapper.nextTuple(); - } catch (Exception e) { - LOG.error("unexpected exception is caught: {}", e.getMessage(), e); - } - - } - } - - /** - * find the correct wrapper to do ack that means msgId should be mapped to - * wrapper. - * - * @param msgId - */ - @Override - public void ack(Object msgId) { - // decode and get topic - KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId; - KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic); - if (spout != null) { - spout.ack(id.id); - } - } - - @Override - public void fail(Object msgId) { - // decode and get topic - KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId; - LOG.error("Failing message {}, with topic {}", msgId, id.topic); - KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic); - if (spout != null) { - spout.fail(id.id); - } - } - - @Override - public void deactivate() { - System.out.println("deactivate"); - for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) { - wrapper.deactivate(); - } - } - - @Override - public void close() { - System.out.println("close"); - for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) { - wrapper.close(); - } - } - - private List<String> getTopics(SpoutSpec spoutSpec) { - List<String> meta = new ArrayList<String>(); - for (Kafka2TupleMetadata entry : spoutSpec.getKafka2TupleMetadataMap().values()) { - meta.add(entry.getTopic()); - } - return meta; - } - - @SuppressWarnings("unchecked") - public void onReload(final SpoutSpec newMeta, Map<String, StreamDefinition> sds) throws Exception { - // calculate topic create/remove/update - List<String> topics = getTopics(newMeta); - List<String> cachedTopcies = getTopics(cachedSpoutSpec); - Collection<String> newTopics = CollectionUtils.subtract(topics, cachedTopcies); - Collection<String> removeTopics = CollectionUtils.subtract(cachedTopcies, topics); - Collection<String> updateTopics = CollectionUtils.intersection(topics, cachedTopcies); - - LOG.info("Topics were added={}, removed={}, modified={}", newTopics, removeTopics, updateTopics); - - // build lookup table for scheme - Map<String, String> newSchemaName = new HashMap<String, String>(); - Map<String, Map<String, String>> dataSourceProperties = new HashMap<>(); - for (Kafka2TupleMetadata ds : newMeta.getKafka2TupleMetadataMap().values()) { - newSchemaName.put(ds.getTopic(), ds.getSchemeCls()); - dataSourceProperties.put(ds.getTopic(), ds.getProperties()); - } - - // copy and swap - Map<String, KafkaSpoutWrapper> newKafkaSpoutList = new HashMap<>(this.kafkaSpoutList); - // iterate new topics and then create KafkaSpout - for (String topic : newTopics) { - KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic); - if (wrapper != null) { - LOG.warn(MessageFormat.format("try to create new topic {0}, but found in the active spout list, this may indicate some inconsistency", topic)); - continue; - } - KafkaSpoutWrapper newWrapper = createKafkaSpout(ConfigFactory.parseMap(dataSourceProperties.get(topic)).withFallback(this.config), - conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds); - newKafkaSpoutList.put(topic, newWrapper); - } - // iterate remove topics and then close KafkaSpout - for (String topic : removeTopics) { - KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic); - if (wrapper == null) { - LOG.warn(MessageFormat.format("try to remove topic {0}, but not found in the active spout list, this may indicate some inconsistency", topic)); - continue; - } - removeKafkaSpout(wrapper); - newKafkaSpoutList.remove(topic); - } - - // iterate update topic and then update metadata - for (String topic : updateTopics) { - KafkaSpoutWrapper spoutWrapper = newKafkaSpoutList.get(topic); - if (spoutWrapper == null) { - LOG.warn(MessageFormat.format("try to update topic {0}, but not found in the active spout list, this may indicate some inconsistency", topic)); - continue; - } - spoutWrapper.update(newMeta, sds); - } - - // swap - this.cachedSpoutSpec = newMeta; - this.kafkaSpoutList = newKafkaSpoutList; - this.sds = sds; - } - - /** - * make this method protected to make sure unit test can work well - * Q: Where to persist consumer state, i.e. what offset has been consumed for each topic and partition - * A: stormKafkaTransactionZkPath + "/" + consumerId + "/" + topic + "/" + topologyId + "/" + partitionId - * Note1: PartitionManager.committedPath for composing zkState path, _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId(); - * consumerId by default is EagleConsumer unless it is specified by "stormKafkaEagleConsumer" - * Note2: put topologyId as part of zkState because one topic by design can be consumed by multiple topologies so one topology needs to know - * processed offset for itself - * <p>TODO: Should avoid use Config.get in deep calling stack, should generate config bean as early as possible - * </p> - * - * @param conf - * @param context - * @param collector - * @param topic - * @param spoutSpec - * @return - */ - @SuppressWarnings("rawtypes") - protected KafkaSpoutWrapper createKafkaSpout(Config configure, Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic, - String schemeClsName, SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception { - String kafkaBrokerZkQuorum = configure.getString(AlertConstants.KAFKA_BROKER_ZK_QUORUM); - BrokerHosts hosts = null; - if (configure.hasPath("spout.kafkaBrokerZkBasePath")) { - hosts = new ZkHosts(kafkaBrokerZkQuorum, configure.getString(AlertConstants.KAFKA_BROKER_ZK_BASE_PATH)); - } else { - hosts = new ZkHosts(kafkaBrokerZkQuorum); - } - String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT; - if (configure.hasPath("spout.stormKafkaTransactionZkPath")) { - transactionZkRoot = configure.getString("spout.stormKafkaTransactionZkPath"); - } - boolean logEventEnabled = false; - if (configure.hasPath("topology.logEventEnabled")) { - logEventEnabled = configure.getBoolean("topology.logEventEnabled"); - } - // write partition offset etc. into zkRoot+id, see PartitionManager.committedPath - String zkStateTransactionRelPath = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH; - if (configure.hasPath("spout.stormKafkaEagleConsumer")) { - zkStateTransactionRelPath = configure.getString("spout.stormKafkaEagleConsumer"); - } - SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, transactionZkRoot, zkStateTransactionRelPath + "/" + topic + "/" + topologyId); - // transaction zkServers - boolean stormKafkaUseSameZkQuorumWithKafkaBroker = configure.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker"); - if (stormKafkaUseSameZkQuorumWithKafkaBroker) { - ZkServerPortUtils utils = new ZkServerPortUtils(kafkaBrokerZkQuorum); - spoutConfig.zkServers = utils.getZkHosts(); - spoutConfig.zkPort = utils.getZkPort(); - } else { - ZkServerPortUtils utils = new ZkServerPortUtils(configure.getString("spout.stormKafkaTransactionZkQuorum")); - spoutConfig.zkServers = utils.getZkHosts(); - spoutConfig.zkPort = utils.getZkPort(); - } - // transaction update interval - spoutConfig.stateUpdateIntervalMs = configure.hasPath("spout.stormKafkaStateUpdateIntervalMs") ? configure.getInt("spout.stormKafkaStateUpdateIntervalMs") : 2000; - // Kafka fetch size - spoutConfig.fetchSizeBytes = configure.hasPath("spout.stormKafkaFetchSizeBytes") ? configure.getInt("spout.stormKafkaFetchSizeBytes") : 1048586; - // "startOffsetTime" is for test usage, prod should not use this - if (configure.hasPath("spout.stormKafkaStartOffsetTime")) { - spoutConfig.startOffsetTime = configure.getInt("spout.stormKafkaStartOffsetTime"); - } - - spoutConfig.scheme = createMultiScheme(conf, topic, schemeClsName); - KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, kafkaSpoutMetric); - SpoutOutputCollectorWrapper collectorWrapper = new SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, numOfRouterBolts, sds, this.serializer, logEventEnabled); - wrapper.open(conf, context, collectorWrapper); - - if (LOG.isInfoEnabled()) { - LOG.info("create and open kafka wrapper: topic {}, scheme class{} ", topic, schemeClsName); - } - return wrapper; - } - - private MultiScheme createMultiScheme(Map conf, String topic, String schemeClsName) throws Exception { - Object scheme = SchemeBuilder.buildFromClsName(schemeClsName, topic, conf); - if (scheme instanceof MultiScheme) { - return (MultiScheme) scheme; - } else if (scheme instanceof Scheme) { - return new SchemeAsMultiScheme((Scheme) scheme); - } else { - LOG.error("create spout scheme failed."); - throw new IllegalArgumentException("create spout scheme failed."); - } - } - - @Override - public StreamDefinition getStreamDefinition(String streamId) { - return sds.get(streamId); - } - - /** - * utility to get list of zkServers and zkPort.(It is assumed that zkPort is same for all zkServers as storm-kafka library requires this though it is not efficient) - */ - private static class ZkServerPortUtils { - private List<String> zkHosts = new ArrayList<>(); - private Integer zkPort; - - public ZkServerPortUtils(String zkQuorum) { - String[] zkConnections = zkQuorum.split(","); - for (String zkConnection : zkConnections) { - zkHosts.add(zkConnection.split(":")[0]); - } - zkPort = Integer.valueOf(zkConnections[0].split(":")[1]); - } - - public List<String> getZkHosts() { - return zkHosts; - } - - public Integer getZkPort() { - return zkPort; - } - } - - protected void removeKafkaSpout(KafkaSpoutWrapper wrapper) { - try { - wrapper.close(); - } catch (Exception e) { - LOG.error("Close wrapper failed. Ignore and continue!", e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java deleted file mode 100644 index 5b7e542..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.spout; - -import org.slf4j.Logger; - -/** - * normally this is used in unit test for convenience. - */ -public class CreateTopicUtils { - - private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CreateTopicUtils.class); - - private static final int partitions = 2; - private static final int replicationFactor = 1; - - public static void ensureTopicReady(String zkQuorum, String topic) { - // ZkConnection zkConnection = new ZkConnection(zkQuorum); - // ZkClient zkClient = new ZkClient(zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$); - //// ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); - // if (!AdminUtils.topicExists(zkClient, topic)) { - // LOG.info("create topic " + topic + " with partitions " + partitions + ", and replicationFactor " - // + replicationFactor); - // AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties()); - // } - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java deleted file mode 100644 index 3c8c99d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.spout; - -import org.apache.eagle.alert.coordination.model.SpoutSpec; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -import java.util.Map; - -/** - * topic to stream metadata lifecycle method - * one topic may spawn multiple streams, the metadata change includes - * 1. add/remove stream - * 2. for a specific stream, groupingstrategy is changed - * ex1, this stream has more alert bolts than before, then this spout would take more traffic - * ex2, this stream has less alert bolts than before, then this spout would take less traffic - */ -public interface ISpoutSpecLCM { - /** - * stream metadata is used for SPOUT to filter traffic and route traffic to following groupby bolts. - * - * @param metadata - */ - void update(SpoutSpec metadata, Map<String, StreamDefinition> sds); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java deleted file mode 100644 index 74dea03..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.spout; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.eagle.common.DateTimeUtil; - -/** - * Created on 2/18/16. - */ -public class KafkaMessageIdWrapper { - public Object id; - public String topic; - public long timestamp; - - public KafkaMessageIdWrapper(Object o) { - this.id = o; - } - - private static final ObjectMapper objectMapper = new ObjectMapper(); - - public String toString() { - try { - return String.format("KafkaMessageIdWrapper[topic=%s, id=%s, timestamp=%s %s]", - topic, - objectMapper.writeValueAsString(id), - DateTimeUtil.millisecondsToHumanDateWithSeconds(timestamp), - DateTimeUtil.CURRENT_TIME_ZONE.getID()); - } catch (JsonProcessingException e) { - throw new IllegalStateException(e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java deleted file mode 100644 index 440db0a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.spout; - -import java.util.Map; - - -/** - * All Scheme implementations should have the following conditions - * 1) implement Scheme interface - * 2) has one constructor with topic name as parameter. - */ -public class SchemeBuilder { - - @SuppressWarnings("rawtypes") - public static Object buildFromClsName(String clsName, String topic, Map conf) throws Exception { - Object o = Class.forName(clsName).getConstructor(String.class, Map.class).newInstance(topic, conf); - return o; - } -}
