http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java new file mode 100644 index 0000000..bd7e5e0 --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringFormattingInterceptor.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.interceptor; + +import java.util.Arrays; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.interceptor.Interceptor; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; + +import static org.apache.apex.malhar.flume.interceptor.ColumnFilteringFormattingInterceptor.Constants.COLUMNS_FORMATTER; + +/** + * <p>ColumnFilteringFormattingInterceptor class.</p> + * + * @since 0.9.4 + */ +public class ColumnFilteringFormattingInterceptor implements Interceptor +{ + private final byte srcSeparator; + private final byte[][] dstSeparators; + private final byte[] prefix; + private final int maxIndex; + private final int maxColumn; + private final int[] columns; + private final int[] positions; + + private ColumnFilteringFormattingInterceptor(int[] columns, byte srcSeparator, byte[][] dstSeparators, byte[] prefix) + { + this.columns = columns; + + int tempMaxColumn = Integer.MIN_VALUE; + for (int column : columns) { + if (column > tempMaxColumn) { + tempMaxColumn = column; + } + } + maxIndex = tempMaxColumn; + maxColumn = tempMaxColumn + 1; + positions = new int[maxColumn + 1]; + this.srcSeparator = srcSeparator; + this.dstSeparators = dstSeparators; + this.prefix = prefix; + } + + @Override + public void initialize() + { + /* no-op */ + } + + @Override + public Event intercept(Event event) + { + byte[] body = event.getBody(); + if (body == null) { + return event; + } + + final int length = body.length; + + /* store positions of character after the separators */ + int i = 0; + int index = 0; + while (i < length) { + if (body[i++] == srcSeparator) { + positions[++index] = i; + if (index >= maxIndex) { + break; + } + } + } + + int nextVirginIndex; + boolean separatorAtEnd = true; + if (i == length && index < maxColumn) { + nextVirginIndex = index + 2; + positions[nextVirginIndex - 1] = length; + separatorAtEnd = length > 0 ? body[length - 1] == srcSeparator : false; + } else { + nextVirginIndex = index + 1; + } + + int newArrayLen = prefix.length; + for (i = columns.length; i-- > 0; ) { + int column = columns[i]; + int len = positions[column + 1] - positions[column]; + if (len > 0) { + if (positions[column + 1] == length && !separatorAtEnd) { + newArrayLen += len; + } else { + newArrayLen += len - 1; + } + } + newArrayLen += dstSeparators[i].length; + } + + byte[] newBody = new byte[newArrayLen]; + int newOffset = 0; + if (prefix.length > 0) { + System.arraycopy(prefix, 0, newBody, 0, prefix.length); + newOffset += prefix.length; + } + int dstSeparatorsIdx = 0; + for (int column : columns) { + int len = positions[column + 1] - positions[column]; + byte[] separator = dstSeparators[dstSeparatorsIdx++]; + if (len > 0) { + System.arraycopy(body, positions[column], newBody, newOffset, len); + newOffset += len; + if (newBody[newOffset - 1] == srcSeparator) { + newOffset--; + } + } + System.arraycopy(separator, 0, newBody, newOffset, separator.length); + newOffset += separator.length; + } + event.setBody(newBody); + Arrays.fill(positions, 1, nextVirginIndex, 0); + return event; + } + + @Override + public List<Event> intercept(List<Event> events) + { + for (Event event : events) { + intercept(event); + } + return events; + } + + @Override + public void close() + { + } + + public static class Builder implements Interceptor.Builder + { + private int[] columns; + private byte srcSeparator; + private byte[][] dstSeparators; + private byte[] prefix; + + @Override + public Interceptor build() + { + return new ColumnFilteringFormattingInterceptor(columns, srcSeparator, dstSeparators, prefix); + } + + @Override + public void configure(Context context) + { + String formatter = context.getString(COLUMNS_FORMATTER); + if (Strings.isNullOrEmpty(formatter)) { + throw new IllegalArgumentException("This interceptor requires columns format to be specified!"); + } + List<String> lSeparators = Lists.newArrayList(); + List<Integer> lColumns = Lists.newArrayList(); + Pattern colPat = Pattern.compile("\\{\\d+?\\}"); + Matcher matcher = colPat.matcher(formatter); + int separatorStart = 0; + String lPrefix = ""; + while (matcher.find()) { + String col = matcher.group(); + lColumns.add(Integer.parseInt(col.substring(1, col.length() - 1))); + if (separatorStart == 0 && matcher.start() > 0) { + lPrefix = formatter.substring(0, matcher.start()); + } else if (separatorStart > 0) { + lSeparators.add(formatter.substring(separatorStart, matcher.start())); + } + + separatorStart = matcher.end(); + } + if (separatorStart < formatter.length()) { + lSeparators.add(formatter.substring(separatorStart, formatter.length())); + } + columns = Ints.toArray(lColumns); + byte[] emptyStringBytes = "".getBytes(); + + dstSeparators = new byte[columns.length][]; + + for (int i = 0; i < columns.length; i++) { + if (i < lSeparators.size()) { + dstSeparators[i] = lSeparators.get(i).getBytes(); + } else { + dstSeparators[i] = emptyStringBytes; + } + } + srcSeparator = context.getInteger(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, (int) ColumnFilteringInterceptor.Constants.SRC_SEPARATOR_DFLT).byteValue(); + this.prefix = lPrefix.getBytes(); + } + } + + public static class Constants extends ColumnFilteringInterceptor.Constants + { + public static final String COLUMNS_FORMATTER = "columnsFormatter"; + } + + private static final Logger logger = LoggerFactory.getLogger(ColumnFilteringFormattingInterceptor.class); + +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptor.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptor.java b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptor.java new file mode 100644 index 0000000..f0de5e0 --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/interceptor/ColumnFilteringInterceptor.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.interceptor; + +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.interceptor.Interceptor; + +import static org.apache.apex.malhar.flume.interceptor.ColumnFilteringInterceptor.Constants.COLUMNS; +import static org.apache.apex.malhar.flume.interceptor.ColumnFilteringInterceptor.Constants.DST_SEPARATOR; +import static org.apache.apex.malhar.flume.interceptor.ColumnFilteringInterceptor.Constants.DST_SEPARATOR_DFLT; +import static org.apache.apex.malhar.flume.interceptor.ColumnFilteringInterceptor.Constants.SRC_SEPARATOR; +import static org.apache.apex.malhar.flume.interceptor.ColumnFilteringInterceptor.Constants.SRC_SEPARATOR_DFLT; + +/** + * <p>ColumnFilteringInterceptor class.</p> + * + * @since 0.9.4 + */ +public class ColumnFilteringInterceptor implements Interceptor +{ + private final byte srcSeparator; + private final byte dstSeparator; + + private final int maxIndex; + private final int maxColumn; + private final int[] columns; + private final int[] positions; + + private ColumnFilteringInterceptor(int[] columns, byte srcSeparator, byte dstSeparator) + { + this.columns = columns; + + int tempMaxColumn = Integer.MIN_VALUE; + for (int column: columns) { + if (column > tempMaxColumn) { + tempMaxColumn = column; + } + } + maxIndex = tempMaxColumn; + maxColumn = tempMaxColumn + 1; + positions = new int[maxColumn + 1]; + + this.srcSeparator = srcSeparator; + this.dstSeparator = dstSeparator; + } + + @Override + public void initialize() + { + /* no-op */ + } + + @Override + public Event intercept(Event event) + { + byte[] body = event.getBody(); + if (body == null) { + return event; + } + + final int length = body.length; + + /* store positions of character after the separators */ + int i = 0; + int index = 0; + while (i < length) { + if (body[i++] == srcSeparator) { + positions[++index] = i; + if (index >= maxIndex) { + break; + } + } + } + + int nextVirginIndex; + boolean separatorTerminated; + if (i == length && index < maxColumn) { + nextVirginIndex = index + 2; + positions[nextVirginIndex - 1] = length; + separatorTerminated = length > 0 ? body[length - 1] != srcSeparator : false; + } else { + nextVirginIndex = index + 1; + separatorTerminated = true; + } + + int newArrayLen = 0; + for (i = columns.length; i-- > 0;) { + int column = columns[i]; + int len = positions[column + 1] - positions[column]; + if (len <= 0) { + newArrayLen++; + } else { + if (separatorTerminated && positions[column + 1] == length) { + newArrayLen++; + } + newArrayLen += len; + } + } + + byte[] newbody = new byte[newArrayLen]; + int newoffset = 0; + for (int column: columns) { + int len = positions[column + 1] - positions[column]; + if (len > 0) { + System.arraycopy(body, positions[column], newbody, newoffset, len); + newoffset += len; + if (newbody[newoffset - 1] == srcSeparator) { + newbody[newoffset - 1] = dstSeparator; + } else { + newbody[newoffset++] = dstSeparator; + } + } else { + newbody[newoffset++] = dstSeparator; + } + } + + event.setBody(newbody); + Arrays.fill(positions, 1, nextVirginIndex, 0); + return event; + } + + @Override + public List<Event> intercept(List<Event> events) + { + for (Event event: events) { + intercept(event); + } + return events; + } + + @Override + public void close() + { + } + + public static class Builder implements Interceptor.Builder + { + private int[] columns; + private byte srcSeparator; + private byte dstSeparator; + + @Override + public Interceptor build() + { + return new ColumnFilteringInterceptor(columns, srcSeparator, dstSeparator); + } + + @Override + public void configure(Context context) + { + String sColumns = context.getString(COLUMNS); + if (sColumns == null || sColumns.trim().isEmpty()) { + throw new Error("This interceptor requires filtered columns to be specified!"); + } + + String[] parts = sColumns.split(" "); + columns = new int[parts.length]; + for (int i = parts.length; i-- > 0;) { + columns[i] = Integer.parseInt(parts[i]); + } + + srcSeparator = context.getInteger(SRC_SEPARATOR, (int)SRC_SEPARATOR_DFLT).byteValue(); + dstSeparator = context.getInteger(DST_SEPARATOR, (int)DST_SEPARATOR_DFLT).byteValue(); + } + + } + + @SuppressWarnings("ClassMayBeInterface") /* adhering to flume until i understand it completely */ + + public static class Constants + { + public static final String SRC_SEPARATOR = "srcSeparator"; + public static final byte SRC_SEPARATOR_DFLT = 2; + + public static final String DST_SEPARATOR = "dstSeparator"; + public static final byte DST_SEPARATOR_DFLT = 1; + + public static final String COLUMNS = "columns"; + } + + private static final Logger logger = LoggerFactory.getLogger(ColumnFilteringInterceptor.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java b/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java new file mode 100644 index 0000000..da1a8aa --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/operator/AbstractFlumeInputOperator.java @@ -0,0 +1,759 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.operator; + +import java.io.IOException; +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ArrayBlockingQueue; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.flume.discovery.Discovery; +import org.apache.apex.malhar.flume.sink.Server; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Event; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.Stats.OperatorStats; +import com.datatorrent.api.StreamCodec; +import org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery; +import com.datatorrent.netlet.AbstractLengthPrependerClient; +import com.datatorrent.netlet.DefaultEventLoop; +import com.datatorrent.netlet.util.Slice; + +import static java.lang.Thread.sleep; + +/** + * <p> + * Abstract AbstractFlumeInputOperator class.</p> + * + * @param <T> Type of the output payload. + * @since 0.9.2 + */ +public abstract class AbstractFlumeInputOperator<T> + implements InputOperator, Operator.ActivationListener<OperatorContext>, Operator.IdleTimeHandler, + Operator.CheckpointListener, Partitioner<AbstractFlumeInputOperator<T>> +{ + public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>(); + public final transient DefaultOutputPort<Slice> drop = new DefaultOutputPort<Slice>(); + @NotNull + private String[] connectionSpecs; + @NotNull + private StreamCodec<Event> codec; + private final ArrayList<RecoveryAddress> recoveryAddresses; + @SuppressWarnings("FieldMayBeFinal") // it's not final because that mucks with the serialization somehow + private transient ArrayBlockingQueue<Slice> handoverBuffer; + private transient int idleCounter; + private transient int eventCounter; + private transient DefaultEventLoop eventloop; + private transient volatile boolean connected; + private transient OperatorContext context; + private transient Client client; + private transient long windowId; + private transient byte[] address; + @Min(0) + private long maxEventsPerSecond; + //This is calculated from maxEventsPerSecond, App window count and streaming window size + private transient long maxEventsPerWindow; + + public AbstractFlumeInputOperator() + { + handoverBuffer = new ArrayBlockingQueue<Slice>(1024 * 5); + connectionSpecs = new String[0]; + recoveryAddresses = new ArrayList<RecoveryAddress>(); + maxEventsPerSecond = Long.MAX_VALUE; + } + + @Override + public void setup(OperatorContext context) + { + long windowDurationMillis = context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); + maxEventsPerWindow = (long)(windowDurationMillis / 1000.0 * maxEventsPerSecond); + logger.debug("max-events per-second {} per-window {}", maxEventsPerSecond, maxEventsPerWindow); + + try { + eventloop = new DefaultEventLoop("EventLoop-" + context.getId()); + eventloop.start(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + @SuppressWarnings({"unchecked"}) + public void activate(OperatorContext ctx) + { + if (connectionSpecs.length == 0) { + logger.info("Discovered zero DTFlumeSink"); + } else if (connectionSpecs.length == 1) { + for (String connectAddresse: connectionSpecs) { + logger.debug("Connection spec is {}", connectAddresse); + String[] parts = connectAddresse.split(":"); + eventloop.connect(new InetSocketAddress(parts[1], Integer.parseInt(parts[2])), client = new Client(parts[0])); + } + } else { + throw new IllegalArgumentException( + String.format("A physical %s operator cannot connect to more than 1 addresses!", + this.getClass().getSimpleName())); + } + + context = ctx; + } + + @Override + public void beginWindow(long windowId) + { + this.windowId = windowId; + idleCounter = 0; + eventCounter = 0; + } + + @Override + public void emitTuples() + { + int i = handoverBuffer.size(); + if (i > 0 && eventCounter < maxEventsPerWindow) { + + while (--i > 0 && eventCounter < maxEventsPerWindow - 1) { + final Slice slice = handoverBuffer.poll(); + slice.offset += 8; + slice.length -= 8; + T convert = convert((Event)codec.fromByteArray(slice)); + if (convert == null) { + drop.emit(slice); + } else { + output.emit(convert); + } + eventCounter++; + } + + final Slice slice = handoverBuffer.poll(); + slice.offset += 8; + slice.length -= 8; + T convert = convert((Event)codec.fromByteArray(slice)); + if (convert == null) { + drop.emit(slice); + } else { + output.emit(convert); + } + eventCounter++; + + address = Arrays.copyOfRange(slice.buffer, slice.offset - 8, slice.offset); + } + } + + @Override + public void endWindow() + { + if (connected) { + byte[] array = new byte[Server.Request.FIXED_SIZE]; + + array[0] = Server.Command.WINDOWED.getOrdinal(); + Server.writeInt(array, 1, eventCounter); + Server.writeInt(array, 5, idleCounter); + Server.writeLong(array, Server.Request.TIME_OFFSET, System.currentTimeMillis()); + + logger.debug("wrote {} with eventCounter = {} and idleCounter = {}", Server.Command.WINDOWED, eventCounter, idleCounter); + client.write(array); + } + + if (address != null) { + RecoveryAddress rAddress = new RecoveryAddress(); + rAddress.address = address; + address = null; + rAddress.windowId = windowId; + recoveryAddresses.add(rAddress); + } + } + + @Override + public void deactivate() + { + if (connected) { + eventloop.disconnect(client); + } + context = null; + } + + @Override + public void teardown() + { + eventloop.stop(); + eventloop = null; + } + + @Override + public void handleIdleTime() + { + idleCounter++; + try { + sleep(context.getValue(OperatorContext.SPIN_MILLIS)); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + public abstract T convert(Event event); + + /** + * @return the connectAddress + */ + public String[] getConnectAddresses() + { + return connectionSpecs.clone(); + } + + /** + * @param specs - sinkid:host:port specification of all the sinks. + */ + public void setConnectAddresses(String[] specs) + { + this.connectionSpecs = specs.clone(); + } + + /** + * @return the codec + */ + public StreamCodec<Event> getCodec() + { + return codec; + } + + /** + * @param codec the codec to set + */ + public void setCodec(StreamCodec<Event> codec) + { + this.codec = codec; + } + + private static class RecoveryAddress implements Serializable + { + long windowId; + byte[] address; + + @Override + public String toString() + { + return "RecoveryAddress{" + "windowId=" + windowId + ", address=" + Arrays.toString(address) + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof RecoveryAddress)) { + return false; + } + + RecoveryAddress that = (RecoveryAddress)o; + + if (windowId != that.windowId) { + return false; + } + return Arrays.equals(address, that.address); + } + + @Override + public int hashCode() + { + int result = (int)(windowId ^ (windowId >>> 32)); + result = 31 * result + (address != null ? Arrays.hashCode(address) : 0); + return result; + } + + private static final long serialVersionUID = 201312021432L; + } + + @Override + public void checkpointed(long windowId) + { + /* dont do anything */ + } + + @Override + public void committed(long windowId) + { + if (!connected) { + return; + } + + synchronized (recoveryAddresses) { + byte[] addr = null; + + Iterator<RecoveryAddress> iterator = recoveryAddresses.iterator(); + while (iterator.hasNext()) { + RecoveryAddress ra = iterator.next(); + if (ra.windowId > windowId) { + break; + } + + iterator.remove(); + if (ra.address != null) { + addr = ra.address; + } + } + + if (addr != null) { + /* + * Make sure that we store the last valid address processed + */ + if (recoveryAddresses.isEmpty()) { + RecoveryAddress ra = new RecoveryAddress(); + ra.address = addr; + recoveryAddresses.add(ra); + } + + int arraySize = 1/* for the type of the message */ + + 8 /* for the location to commit */ + + 8 /* for storing the current time stamp*/; + byte[] array = new byte[arraySize]; + + array[0] = Server.Command.COMMITTED.getOrdinal(); + System.arraycopy(addr, 0, array, 1, 8); + Server.writeLong(array, Server.Request.TIME_OFFSET, System.currentTimeMillis()); + logger.debug("wrote {} with recoveryOffset = {}", Server.Command.COMMITTED, Arrays.toString(addr)); + client.write(array); + } + } + } + + @Override + public Collection<Partition<AbstractFlumeInputOperator<T>>> definePartitions( + Collection<Partition<AbstractFlumeInputOperator<T>>> partitions, PartitioningContext context) + { + Collection<Discovery.Service<byte[]>> discovered = discoveredFlumeSinks.get(); + if (discovered == null) { + return partitions; + } + + HashMap<String, ArrayList<RecoveryAddress>> allRecoveryAddresses = abandonedRecoveryAddresses.get(); + ArrayList<String> allConnectAddresses = new ArrayList<String>(partitions.size()); + for (Partition<AbstractFlumeInputOperator<T>> partition: partitions) { + String[] lAddresses = partition.getPartitionedInstance().connectionSpecs; + allConnectAddresses.addAll(Arrays.asList(lAddresses)); + for (int i = lAddresses.length; i-- > 0;) { + String[] parts = lAddresses[i].split(":", 2); + allRecoveryAddresses.put(parts[0], partition.getPartitionedInstance().recoveryAddresses); + } + } + + HashMap<String, String> connections = new HashMap<String, String>(discovered.size()); + for (Discovery.Service<byte[]> service: discovered) { + String previousSpec = connections.get(service.getId()); + String newspec = service.getId() + ':' + service.getHost() + ':' + service.getPort(); + if (previousSpec == null) { + connections.put(service.getId(), newspec); + } else { + boolean found = false; + for (ConnectionStatus cs: partitionedInstanceStatus.get().values()) { + if (previousSpec.equals(cs.spec) && !cs.connected) { + connections.put(service.getId(), newspec); + found = true; + break; + } + } + + if (!found) { + logger.warn("2 sinks found with the same id: {} and {}... Ignoring previous.", previousSpec, newspec); + connections.put(service.getId(), newspec); + } + } + } + + for (int i = allConnectAddresses.size(); i-- > 0;) { + String[] parts = allConnectAddresses.get(i).split(":"); + String connection = connections.remove(parts[0]); + if (connection == null) { + allConnectAddresses.remove(i); + } else { + allConnectAddresses.set(i, connection); + } + } + + allConnectAddresses.addAll(connections.values()); + + partitions.clear(); + try { + if (allConnectAddresses.isEmpty()) { + /* return at least one of them; otherwise stram becomes grumpy */ + @SuppressWarnings("unchecked") + AbstractFlumeInputOperator<T> operator = getClass().newInstance(); + operator.setCodec(codec); + operator.setMaxEventsPerSecond(maxEventsPerSecond); + for (ArrayList<RecoveryAddress> lRecoveryAddresses: allRecoveryAddresses.values()) { + operator.recoveryAddresses.addAll(lRecoveryAddresses); + } + operator.connectionSpecs = new String[allConnectAddresses.size()]; + for (int i = connectionSpecs.length; i-- > 0;) { + connectionSpecs[i] = allConnectAddresses.get(i); + } + + partitions.add(new DefaultPartition<AbstractFlumeInputOperator<T>>(operator)); + } else { + long maxEventsPerSecondPerOperator = maxEventsPerSecond / allConnectAddresses.size(); + for (int i = allConnectAddresses.size(); i-- > 0;) { + @SuppressWarnings("unchecked") + AbstractFlumeInputOperator<T> operator = getClass().newInstance(); + operator.setCodec(codec); + operator.setMaxEventsPerSecond(maxEventsPerSecondPerOperator); + String connectAddress = allConnectAddresses.get(i); + operator.connectionSpecs = new String[] {connectAddress}; + + String[] parts = connectAddress.split(":", 2); + ArrayList<RecoveryAddress> remove = allRecoveryAddresses.remove(parts[0]); + if (remove != null) { + operator.recoveryAddresses.addAll(remove); + } + + partitions.add(new DefaultPartition<AbstractFlumeInputOperator<T>>(operator)); + } + } + } catch (IllegalAccessException ex) { + throw new RuntimeException(ex); + } catch (InstantiationException ex) { + throw new RuntimeException(ex); + } + + logger.debug("Requesting partitions: {}", partitions); + return partitions; + } + + @Override + public void partitioned(Map<Integer, Partition<AbstractFlumeInputOperator<T>>> partitions) + { + logger.debug("Partitioned Map: {}", partitions); + HashMap<Integer, ConnectionStatus> map = partitionedInstanceStatus.get(); + map.clear(); + for (Entry<Integer, Partition<AbstractFlumeInputOperator<T>>> entry: partitions.entrySet()) { + if (map.containsKey(entry.getKey())) { + // what can be done here? + } else { + map.put(entry.getKey(), null); + } + } + } + + @Override + public String toString() + { + return "AbstractFlumeInputOperator{" + "connected=" + connected + ", connectionSpecs=" + + (connectionSpecs.length == 0 ? "empty" : connectionSpecs[0]) + ", recoveryAddresses=" + recoveryAddresses + '}'; + } + + class Client extends AbstractLengthPrependerClient + { + private final String id; + + Client(String id) + { + this.id = id; + } + + @Override + public void onMessage(byte[] buffer, int offset, int size) + { + try { + handoverBuffer.put(new Slice(buffer, offset, size)); + } catch (InterruptedException ex) { + handleException(ex, eventloop); + } + } + + @Override + public void connected() + { + super.connected(); + + byte[] address; + synchronized (recoveryAddresses) { + if (recoveryAddresses.size() > 0) { + address = recoveryAddresses.get(recoveryAddresses.size() - 1).address; + } else { + address = new byte[8]; + } + } + + int len = 1 /* for the message type SEEK */ + + 8 /* for the address */ + + 8 /* for storing the current time stamp*/; + + byte[] array = new byte[len]; + array[0] = Server.Command.SEEK.getOrdinal(); + System.arraycopy(address, 0, array, 1, 8); + Server.writeLong(array, 9, System.currentTimeMillis()); + write(array); + + connected = true; + ConnectionStatus connectionStatus = new ConnectionStatus(); + connectionStatus.connected = true; + connectionStatus.spec = connectionSpecs[0]; + OperatorContext ctx = context; + synchronized (ctx) { + logger.debug("{} Submitting ConnectionStatus = {}", AbstractFlumeInputOperator.this, connectionStatus); + context.setCounters(connectionStatus); + } + } + + @Override + public void disconnected() + { + connected = false; + ConnectionStatus connectionStatus = new ConnectionStatus(); + connectionStatus.connected = false; + connectionStatus.spec = connectionSpecs[0]; + OperatorContext ctx = context; + synchronized (ctx) { + logger.debug("{} Submitting ConnectionStatus = {}", AbstractFlumeInputOperator.this, connectionStatus); + context.setCounters(connectionStatus); + } + super.disconnected(); + } + + } + + public static class ZKStatsListner extends ZKAssistedDiscovery implements com.datatorrent.api.StatsListener, + Serializable + { + /* + * In the current design, one input operator is able to connect + * to only one flume adapter. Sometime in future, we should support + * any number of input operators connecting to any number of flume + * sinks and vice a versa. + * + * Until that happens the following map should be sufficient to + * keep track of which input operator is connected to which flume sink. + */ + long intervalMillis; + private final Response response; + private transient long nextMillis; + + public ZKStatsListner() + { + intervalMillis = 60 * 1000L; + response = new Response(); + } + + @Override + public Response processStats(BatchedOperatorStats stats) + { + final HashMap<Integer, ConnectionStatus> map = partitionedInstanceStatus.get(); + response.repartitionRequired = false; + + Object lastStat = null; + List<OperatorStats> lastWindowedStats = stats.getLastWindowedStats(); + for (OperatorStats os: lastWindowedStats) { + if (os.counters != null) { + lastStat = os.counters; + logger.debug("Received custom stats = {}", lastStat); + } + } + + if (lastStat instanceof ConnectionStatus) { + ConnectionStatus cs = (ConnectionStatus)lastStat; + map.put(stats.getOperatorId(), cs); + if (!cs.connected) { + logger.debug("setting repatitioned = true because of lastStat = {}", lastStat); + response.repartitionRequired = true; + } + } + + if (System.currentTimeMillis() >= nextMillis) { + logger.debug("nextMillis = {}", nextMillis); + try { + super.setup(null); + Collection<Discovery.Service<byte[]>> addresses; + try { + addresses = discover(); + } finally { + super.teardown(); + } + AbstractFlumeInputOperator.discoveredFlumeSinks.set(addresses); + logger.debug("\ncurrent map = {}\ndiscovered sinks = {}", map, addresses); + switch (addresses.size()) { + case 0: + response.repartitionRequired = map.size() != 1; + break; + + default: + if (addresses.size() == map.size()) { + for (ConnectionStatus value: map.values()) { + if (value == null || !value.connected) { + response.repartitionRequired = true; + break; + } + } + } else { + response.repartitionRequired = true; + } + break; + } + } catch (Error er) { + throw er; + } catch (Throwable cause) { + logger.warn("Unable to discover services, using values from last successful discovery", cause); + } finally { + nextMillis = System.currentTimeMillis() + intervalMillis; + logger.debug("Proposed NextMillis = {}", nextMillis); + } + } + + return response; + } + + /** + * @return the intervalMillis + */ + public long getIntervalMillis() + { + return intervalMillis; + } + + /** + * @param intervalMillis the intervalMillis to set + */ + public void setIntervalMillis(long intervalMillis) + { + this.intervalMillis = intervalMillis; + } + + private static final long serialVersionUID = 201312241646L; + } + + public static class ConnectionStatus implements Serializable + { + int id; + String spec; + boolean connected; + + @Override + public int hashCode() + { + return spec.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final ConnectionStatus other = (ConnectionStatus)obj; + return spec == null ? other.spec == null : spec.equals(other.spec); + } + + @Override + public String toString() + { + return "ConnectionStatus{" + "id=" + id + ", spec=" + spec + ", connected=" + connected + '}'; + } + + private static final long serialVersionUID = 201312261615L; + } + + private static final transient ThreadLocal<HashMap<Integer, ConnectionStatus>> partitionedInstanceStatus = + new ThreadLocal<HashMap<Integer, ConnectionStatus>>() + { + @Override + protected HashMap<Integer, ConnectionStatus> initialValue() + { + return new HashMap<Integer, ConnectionStatus>(); + } + + }; + /** + * When a sink goes away and a replacement sink is not found, we stash the recovery addresses associated + * with the sink in a hope that the new sink may show up in near future. + */ + private static final transient ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>> abandonedRecoveryAddresses = + new ThreadLocal<HashMap<String, ArrayList<RecoveryAddress>>>() + { + @Override + protected HashMap<String, ArrayList<RecoveryAddress>> initialValue() + { + return new HashMap<String, ArrayList<RecoveryAddress>>(); + } + + }; + private static final transient ThreadLocal<Collection<Discovery.Service<byte[]>>> discoveredFlumeSinks = + new ThreadLocal<Collection<Discovery.Service<byte[]>>>(); + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof AbstractFlumeInputOperator)) { + return false; + } + + AbstractFlumeInputOperator<?> that = (AbstractFlumeInputOperator<?>)o; + + if (!Arrays.equals(connectionSpecs, that.connectionSpecs)) { + return false; + } + return recoveryAddresses.equals(that.recoveryAddresses); + + } + + @Override + public int hashCode() + { + int result = connectionSpecs != null ? Arrays.hashCode(connectionSpecs) : 0; + result = 31 * result + (recoveryAddresses.hashCode()); + return result; + } + + public void setMaxEventsPerSecond(long maxEventsPerSecond) + { + this.maxEventsPerSecond = maxEventsPerSecond; + } + + public long getMaxEventsPerSecond() + { + return maxEventsPerSecond; + } + + private static final Logger logger = LoggerFactory.getLogger(AbstractFlumeInputOperator.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java b/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java new file mode 100644 index 0000000..306ce13 --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/sink/DTFlumeSink.java @@ -0,0 +1,572 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.sink; + +import java.io.IOError; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.ServiceConfigurationError; + +import org.apache.apex.malhar.flume.discovery.Discovery; +import org.apache.apex.malhar.flume.storage.EventCodec; +import org.apache.apex.malhar.flume.storage.Storage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Transaction; +import org.apache.flume.conf.Configurable; +import org.apache.flume.sink.AbstractSink; + +import com.datatorrent.api.Component; +import com.datatorrent.api.StreamCodec; +import org.apache.apex.malhar.flume.sink.Server.Client; +import org.apache.apex.malhar.flume.sink.Server.Request; +import com.datatorrent.netlet.DefaultEventLoop; +import com.datatorrent.netlet.NetletThrowable; +import com.datatorrent.netlet.NetletThrowable.NetletRuntimeException; +import com.datatorrent.netlet.util.Slice; + +/** + * DTFlumeSink is a flume sink developed to ingest the data into DataTorrent DAG + * from flume. It's essentially a flume sink which acts as a server capable of + * talking to one client at a time. The client for this server is AbstractFlumeInputOperator. + * <p /> + * <experimental>DTFlumeSink auto adjusts the rate at which it consumes the data from channel to + * match the throughput of the DAG.</experimental> + * <p /> + * The properties you can set on the DTFlumeSink are: <br /> + * id - string unique value identifying this sink <br /> + * hostname - string value indicating the fqdn or ip address of the interface on which the server should listen <br /> + * port - integer value indicating the numeric port to which the server should bind <br /> + * sleepMillis - integer value indicating the number of milliseconds the process should sleep when there are no events + * before checking for next event again <br /> + * throughputAdjustmentPercent - integer value indicating by what percentage the flume transaction size should be + * adjusted upward or downward at a time <br /> + * minimumEventsPerTransaction - integer value indicating the minimum number of events per transaction <br /> + * maximumEventsPerTransaction - integer value indicating the maximum number of events per transaction. This value can + * not be more than channel's transaction capacity.<br /> + * + * @since 0.9.2 + */ +public class DTFlumeSink extends AbstractSink implements Configurable +{ + private static final String HOSTNAME_STRING = "hostname"; + private static final String HOSTNAME_DEFAULT = "locahost"; + private static final long ACCEPTED_TOLERANCE = 20000; + private DefaultEventLoop eventloop; + private Server server; + private int outstandingEventsCount; + private int lastConsumedEventsCount; + private int idleCount; + private byte[] playback; + private Client client; + private String hostname; + private int port; + private String id; + private long acceptedTolerance; + private long sleepMillis; + private double throughputAdjustmentFactor; + private int minimumEventsPerTransaction; + private int maximumEventsPerTransaction; + private long commitEventTimeoutMillis; + private transient long lastCommitEventTimeMillis; + private Storage storage; + Discovery<byte[]> discovery; + StreamCodec<Event> codec; + /* Begin implementing Flume Sink interface */ + + @Override + @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch", "UseSpecificCatch", "SleepWhileInLoop"}) + public Status process() throws EventDeliveryException + { + Slice slice; + synchronized (server.requests) { + for (Request r : server.requests) { + logger.debug("found {}", r); + switch (r.type) { + case SEEK: + lastCommitEventTimeMillis = System.currentTimeMillis(); + slice = r.getAddress(); + playback = storage.retrieve(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length)); + client = r.client; + break; + + case COMMITTED: + lastCommitEventTimeMillis = System.currentTimeMillis(); + slice = r.getAddress(); + storage.clean(Arrays.copyOfRange(slice.buffer, slice.offset, slice.offset + slice.length)); + break; + + case CONNECTED: + logger.debug("Connected received, ignoring it!"); + break; + + case DISCONNECTED: + if (r.client == client) { + client = null; + outstandingEventsCount = 0; + } + break; + + case WINDOWED: + lastConsumedEventsCount = r.getEventCount(); + idleCount = r.getIdleCount(); + outstandingEventsCount -= lastConsumedEventsCount; + break; + + case SERVER_ERROR: + throw new IOError(null); + + default: + logger.debug("Cannot understand the request {}", r); + break; + } + } + + server.requests.clear(); + } + + if (client == null) { + logger.info("No client expressed interest yet to consume the events."); + return Status.BACKOFF; + } else if (System.currentTimeMillis() - lastCommitEventTimeMillis > commitEventTimeoutMillis) { + logger.info("Client has not processed the workload given for the last {} milliseconds, so backing off.", + System.currentTimeMillis() - lastCommitEventTimeMillis); + return Status.BACKOFF; + } + + int maxTuples; + // the following logic needs to be fixed... this is a quick put together. + if (outstandingEventsCount < 0) { + if (idleCount > 1) { + maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * lastConsumedEventsCount); + } else { + maxTuples = (int)((1 + throughputAdjustmentFactor) * lastConsumedEventsCount); + } + } else if (outstandingEventsCount > lastConsumedEventsCount) { + maxTuples = (int)((1 - throughputAdjustmentFactor) * lastConsumedEventsCount); + } else { + if (idleCount > 0) { + maxTuples = (int)((1 + throughputAdjustmentFactor * idleCount) * lastConsumedEventsCount); + if (maxTuples <= 0) { + maxTuples = minimumEventsPerTransaction; + } + } else { + maxTuples = lastConsumedEventsCount; + } + } + + if (maxTuples >= maximumEventsPerTransaction) { + maxTuples = maximumEventsPerTransaction; + } else if (maxTuples <= 0) { + maxTuples = minimumEventsPerTransaction; + } + + if (maxTuples > 0) { + if (playback != null) { + try { + int i = 0; + do { + if (!client.write(playback)) { + retryWrite(playback, null); + } + outstandingEventsCount++; + playback = storage.retrieveNext(); + } + while (++i < maxTuples && playback != null); + } catch (Exception ex) { + logger.warn("Playback Failed", ex); + if (ex instanceof NetletThrowable) { + try { + eventloop.disconnect(client); + } finally { + client = null; + outstandingEventsCount = 0; + } + } + return Status.BACKOFF; + } + } else { + int storedTuples = 0; + + Transaction t = getChannel().getTransaction(); + try { + t.begin(); + + Event e; + while (storedTuples < maxTuples && (e = getChannel().take()) != null) { + Slice event = codec.toByteArray(e); + byte[] address = storage.store(event); + if (address != null) { + if (!client.write(address, event)) { + retryWrite(address, event); + } + outstandingEventsCount++; + } else { + logger.debug("Detected the condition of recovery from flume crash!"); + } + storedTuples++; + } + + if (storedTuples > 0) { + storage.flush(); + } + + t.commit(); + + if (storedTuples > 0) { /* log less frequently */ + logger.debug("Transaction details maxTuples = {}, storedTuples = {}, outstanding = {}", + maxTuples, storedTuples, outstandingEventsCount); + } + } catch (Error er) { + t.rollback(); + throw er; + } catch (Exception ex) { + logger.error("Transaction Failed", ex); + if (ex instanceof NetletRuntimeException && client != null) { + try { + eventloop.disconnect(client); + } finally { + client = null; + outstandingEventsCount = 0; + } + } + t.rollback(); + return Status.BACKOFF; + } finally { + t.close(); + } + + if (storedTuples == 0) { + sleep(); + } + } + } + + return Status.READY; + } + + private void sleep() + { + try { + Thread.sleep(sleepMillis); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void start() + { + try { + if (storage instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)storage; + component.setup(null); + } + if (discovery instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)discovery; + component.setup(null); + } + if (codec instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)codec; + component.setup(null); + } + eventloop = new DefaultEventLoop("EventLoop-" + id); + server = new Server(id, discovery,acceptedTolerance); + } catch (Error error) { + throw error; + } catch (RuntimeException re) { + throw re; + } catch (IOException ex) { + throw new RuntimeException(ex); + } + + eventloop.start(); + eventloop.start(hostname, port, server); + super.start(); + } + + @Override + public void stop() + { + try { + super.stop(); + } finally { + try { + if (client != null) { + eventloop.disconnect(client); + client = null; + } + + eventloop.stop(server); + eventloop.stop(); + + if (codec instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)codec; + component.teardown(); + } + if (discovery instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)discovery; + component.teardown(); + } + if (storage instanceof Component) { + @SuppressWarnings("unchecked") + Component<com.datatorrent.api.Context> component = (Component<com.datatorrent.api.Context>)storage; + component.teardown(); + } + } catch (Throwable cause) { + throw new ServiceConfigurationError("Failed Stop", cause); + } + } + } + + /* End implementing Flume Sink interface */ + + /* Begin Configurable Interface */ + @Override + public void configure(Context context) + { + hostname = context.getString(HOSTNAME_STRING, HOSTNAME_DEFAULT); + port = context.getInteger("port", 0); + id = context.getString("id"); + if (id == null) { + id = getName(); + } + acceptedTolerance = context.getLong("acceptedTolerance", ACCEPTED_TOLERANCE); + sleepMillis = context.getLong("sleepMillis", 5L); + throughputAdjustmentFactor = context.getInteger("throughputAdjustmentPercent", 5) / 100.0; + maximumEventsPerTransaction = context.getInteger("maximumEventsPerTransaction", 10000); + minimumEventsPerTransaction = context.getInteger("minimumEventsPerTransaction", 100); + commitEventTimeoutMillis = context.getLong("commitEventTimeoutMillis", Long.MAX_VALUE); + + @SuppressWarnings("unchecked") + Discovery<byte[]> ldiscovery = configure("discovery", Discovery.class, context); + if (ldiscovery == null) { + logger.warn("Discovery agent not configured for the sink!"); + discovery = new Discovery<byte[]>() + { + @Override + public void unadvertise(Service<byte[]> service) + { + logger.debug("Sink {} stopped listening on {}:{}", service.getId(), service.getHost(), service.getPort()); + } + + @Override + public void advertise(Service<byte[]> service) + { + logger.debug("Sink {} started listening on {}:{}", service.getId(), service.getHost(), service.getPort()); + } + + @Override + @SuppressWarnings("unchecked") + public Collection<Service<byte[]>> discover() + { + return Collections.EMPTY_SET; + } + + }; + } else { + discovery = ldiscovery; + } + + storage = configure("storage", Storage.class, context); + if (storage == null) { + logger.warn("storage key missing... DTFlumeSink may lose data!"); + storage = new Storage() + { + @Override + public byte[] store(Slice slice) + { + return null; + } + + @Override + public byte[] retrieve(byte[] identifier) + { + return null; + } + + @Override + public byte[] retrieveNext() + { + return null; + } + + @Override + public void clean(byte[] identifier) + { + } + + @Override + public void flush() + { + } + + }; + } + + @SuppressWarnings("unchecked") + StreamCodec<Event> lCodec = configure("codec", StreamCodec.class, context); + if (lCodec == null) { + codec = new EventCodec(); + } else { + codec = lCodec; + } + + } + + /* End Configurable Interface */ + + @SuppressWarnings({"UseSpecificCatch", "BroadCatchBlock", "TooBroadCatch"}) + private static <T> T configure(String key, Class<T> clazz, Context context) + { + String classname = context.getString(key); + if (classname == null) { + return null; + } + + try { + Class<?> loadClass = Thread.currentThread().getContextClassLoader().loadClass(classname); + if (clazz.isAssignableFrom(loadClass)) { + @SuppressWarnings("unchecked") + T object = (T)loadClass.newInstance(); + if (object instanceof Configurable) { + Context context1 = new Context(context.getSubProperties(key + '.')); + String id = context1.getString(Storage.ID); + if (id == null) { + id = context.getString(Storage.ID); + logger.debug("{} inherited id={} from sink", key, id); + context1.put(Storage.ID, id); + } + ((Configurable)object).configure(context1); + } + + return object; + } else { + logger.error("key class {} does not implement {} interface", classname, Storage.class.getCanonicalName()); + throw new Error("Invalid storage " + classname); + } + } catch (Error error) { + throw error; + } catch (RuntimeException re) { + throw re; + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + /** + * @return the hostname + */ + String getHostname() + { + return hostname; + } + + /** + * @param hostname the hostname to set + */ + void setHostname(String hostname) + { + this.hostname = hostname; + } + + /** + * @return the port + */ + int getPort() + { + return port; + } + + public long getAcceptedTolerance() + { + return acceptedTolerance; + } + + public void setAcceptedTolerance(long acceptedTolerance) + { + this.acceptedTolerance = acceptedTolerance; + } + + /** + * @param port the port to set + */ + void setPort(int port) + { + this.port = port; + } + + /** + * @return the discovery + */ + Discovery<byte[]> getDiscovery() + { + return discovery; + } + + /** + * @param discovery the discovery to set + */ + void setDiscovery(Discovery<byte[]> discovery) + { + this.discovery = discovery; + } + + /** + * Attempt the sequence of writing after sleeping twice and upon failure assume + * that the client connection has problems and hence close it. + * + * @param address + * @param e + * @throws IOException + */ + private void retryWrite(byte[] address, Slice event) throws IOException + { + if (event == null) { /* this happens for playback where address and event are sent as single object */ + while (client.isConnected()) { + sleep(); + if (client.write(address)) { + return; + } + } + } else { /* this happens when the events are taken from the flume channel and writing first time failed */ + while (client.isConnected()) { + sleep(); + if (client.write(address, event)) { + return; + } + } + } + + throw new IOException("Client disconnected!"); + } + + private static final Logger logger = LoggerFactory.getLogger(DTFlumeSink.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java b/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java new file mode 100644 index 0000000..a771cb3 --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/sink/Server.java @@ -0,0 +1,419 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.sink; + +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.apex.malhar.flume.discovery.Discovery; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.netlet.AbstractLengthPrependerClient; +import com.datatorrent.netlet.AbstractServer; +import com.datatorrent.netlet.EventLoop; +import com.datatorrent.netlet.util.Slice; + +/** + * <p> + * Server class.</p> + * + * @since 0.9.2 + */ +public class Server extends AbstractServer +{ + private final String id; + private final Discovery<byte[]> discovery; + private final long acceptedTolerance; + + public Server(String id, Discovery<byte[]> discovery, long acceptedTolerance) + { + this.id = id; + this.discovery = discovery; + this.acceptedTolerance = acceptedTolerance; + } + + @Override + public void handleException(Exception cce, EventLoop el) + { + logger.error("Server Error", cce); + Request r = new Request(Command.SERVER_ERROR, null) + { + @Override + public Slice getAddress() + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public int getEventCount() + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public int getIdleCount() + { + throw new UnsupportedOperationException("Not supported yet."); + } + + }; + synchronized (requests) { + requests.add(r); + } + } + + private final Discovery.Service<byte[]> service = new Discovery.Service<byte[]>() + { + @Override + public String getHost() + { + return ((InetSocketAddress)getServerAddress()).getHostName(); + } + + @Override + public int getPort() + { + return ((InetSocketAddress)getServerAddress()).getPort(); + } + + @Override + public byte[] getPayload() + { + return null; + } + + @Override + public String getId() + { + return id; + } + + @Override + public String toString() + { + return "Server.Service{id=" + id + ", host=" + getHost() + ", port=" + getPort() + ", payload=" + + Arrays.toString(getPayload()) + '}'; + } + + }; + + @Override + public void unregistered(final SelectionKey key) + { + discovery.unadvertise(service); + super.unregistered(key); + } + + @Override + public void registered(final SelectionKey key) + { + super.registered(key); + discovery.advertise(service); + } + + public enum Command + { + ECHO((byte)0), + SEEK((byte)1), + COMMITTED((byte)2), + CHECKPOINTED((byte)3), + CONNECTED((byte)4), + DISCONNECTED((byte)5), + WINDOWED((byte)6), + SERVER_ERROR((byte)7); + + Command(byte b) + { + this.ord = b; + } + + public byte getOrdinal() + { + return ord; + } + + public static Command getCommand(byte b) + { + Command c; + switch (b) { + case 0: + c = ECHO; + break; + + case 1: + c = SEEK; + break; + + case 2: + c = COMMITTED; + break; + + case 3: + c = CHECKPOINTED; + break; + + case 4: + c = CONNECTED; + break; + + case 5: + c = DISCONNECTED; + break; + + case 6: + c = WINDOWED; + break; + + case 7: + c = SERVER_ERROR; + break; + + default: + throw new IllegalArgumentException(String.format("No Command defined for ordinal %b", b)); + } + + assert (b == c.ord); + return c; + } + + private final byte ord; + } + + public final ArrayList<Request> requests = new ArrayList<Request>(4); + + @Override + public ClientListener getClientConnection(SocketChannel sc, ServerSocketChannel ssc) + { + Client lClient = new Client(); + lClient.connected(); + return lClient; + } + + public class Client extends AbstractLengthPrependerClient + { + + @Override + public void onMessage(byte[] buffer, int offset, int size) + { + if (size != Request.FIXED_SIZE) { + logger.warn("Invalid Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size), + key.channel()); + return; + } + + long requestTime = Server.readLong(buffer, offset + Request.TIME_OFFSET); + if (System.currentTimeMillis() > (requestTime + acceptedTolerance)) { + logger.warn("Expired Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size), + key.channel()); + return; + } + + try { + if (Command.getCommand(buffer[offset]) == Command.ECHO) { + write(buffer, offset, size); + return; + } + } catch (IllegalArgumentException ex) { + logger.warn("Invalid Request Received: {} from {}!", Arrays.copyOfRange(buffer, offset, offset + size), + key.channel(), ex); + return; + } + + Request r = Request.getRequest(buffer, offset, this); + synchronized (requests) { + requests.add(r); + } + } + + @Override + public void disconnected() + { + synchronized (requests) { + requests.add(Request.getRequest( + new byte[] {Command.DISCONNECTED.getOrdinal(), 0, 0, 0, 0, 0, 0, 0, 0}, 0, this)); + } + super.disconnected(); + } + + public boolean write(byte[] address, Slice event) + { + if (event.offset == 0 && event.length == event.buffer.length) { + return write(address, event.buffer); + } + + // a better method would be to replace the write implementation and allow it to natively support writing slices + return write(address, event.toByteArray()); + } + + } + + public abstract static class Request + { + public static final int FIXED_SIZE = 17; + public static final int TIME_OFFSET = 9; + public final Command type; + public final Client client; + + public Request(Command type, Client client) + { + this.type = type; + this.client = client; + } + + public abstract Slice getAddress(); + + public abstract int getEventCount(); + + public abstract int getIdleCount(); + + @Override + public String toString() + { + return "Request{" + "type=" + type + '}'; + } + + public static Request getRequest(final byte[] buffer, final int offset, Client client) + { + Command command = Command.getCommand(buffer[offset]); + switch (command) { + case WINDOWED: + return new Request(Command.WINDOWED, client) + { + final int eventCount; + final int idleCount; + + { + eventCount = Server.readInt(buffer, offset + 1); + idleCount = Server.readInt(buffer, offset + 5); + } + + @Override + public Slice getAddress() + { + throw new UnsupportedOperationException(); + } + + @Override + public int getEventCount() + { + return eventCount; + } + + @Override + public int getIdleCount() + { + return idleCount; + } + + @Override + public String toString() + { + return "Request{" + "type=" + type + ", eventCount=" + eventCount + ", idleCount=" + idleCount + '}'; + } + + }; + + default: + return new Request(command, client) + { + final Slice address; + + { + address = new Slice(buffer, offset + 1, 8); + } + + @Override + public Slice getAddress() + { + return address; + } + + @Override + public int getEventCount() + { + throw new UnsupportedOperationException(); + } + + @Override + public int getIdleCount() + { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() + { + return "Request{" + "type=" + type + ", address=" + address + '}'; + } + + }; + + } + + } + + } + + public static int readInt(byte[] buffer, int offset) + { + return buffer[offset++] & 0xff + | (buffer[offset++] & 0xff) << 8 + | (buffer[offset++] & 0xff) << 16 + | (buffer[offset++] & 0xff) << 24; + } + + public static void writeInt(byte[] buffer, int offset, int i) + { + buffer[offset++] = (byte)i; + buffer[offset++] = (byte)(i >>> 8); + buffer[offset++] = (byte)(i >>> 16); + buffer[offset++] = (byte)(i >>> 24); + } + + public static long readLong(byte[] buffer, int offset) + { + return (long)buffer[offset++] & 0xff + | (long)(buffer[offset++] & 0xff) << 8 + | (long)(buffer[offset++] & 0xff) << 16 + | (long)(buffer[offset++] & 0xff) << 24 + | (long)(buffer[offset++] & 0xff) << 32 + | (long)(buffer[offset++] & 0xff) << 40 + | (long)(buffer[offset++] & 0xff) << 48 + | (long)(buffer[offset++] & 0xff) << 56; + } + + public static void writeLong(byte[] buffer, int offset, long l) + { + buffer[offset++] = (byte)l; + buffer[offset++] = (byte)(l >>> 8); + buffer[offset++] = (byte)(l >>> 16); + buffer[offset++] = (byte)(l >>> 24); + buffer[offset++] = (byte)(l >>> 32); + buffer[offset++] = (byte)(l >>> 40); + buffer[offset++] = (byte)(l >>> 48); + buffer[offset++] = (byte)(l >>> 56); + } + + private static final Logger logger = LoggerFactory.getLogger(Server.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/source/HdfsTestSource.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/source/HdfsTestSource.java b/flume/src/main/java/org/apache/apex/malhar/flume/source/HdfsTestSource.java new file mode 100644 index 0000000..6160bd5 --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/source/HdfsTestSource.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.source; + +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; + +import javax.annotation.Nonnull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDrivenSource; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.conf.Configurable; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.source.AbstractSource; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +/** + * <p>TestSource class.</p> + * + * @since 0.9.4 + */ +public class HdfsTestSource extends AbstractSource implements EventDrivenSource, Configurable +{ + public static final String SOURCE_DIR = "sourceDir"; + public static final String RATE = "rate"; + public static final String INIT_DATE = "initDate"; + + static byte FIELD_SEPARATOR = 2; + public Timer emitTimer; + @Nonnull + String directory; + Path directoryPath; + int rate; + String initDate; + long initTime; + List<String> dataFiles; + long oneDayBack; + + private transient BufferedReader br = null; + protected transient FileSystem fs; + private transient Configuration configuration; + + private transient int currentFile = 0; + private transient boolean finished; + private List<Event> events; + + public HdfsTestSource() + { + super(); + this.rate = 2500; + dataFiles = Lists.newArrayList(); + Calendar calendar = Calendar.getInstance(); + calendar.add(Calendar.DATE, -1); + oneDayBack = calendar.getTimeInMillis(); + configuration = new Configuration(); + events = Lists.newArrayList(); + } + + @Override + public void configure(Context context) + { + directory = context.getString(SOURCE_DIR); + rate = context.getInteger(RATE, rate); + initDate = context.getString(INIT_DATE); + + Preconditions.checkArgument(!Strings.isNullOrEmpty(directory)); + directoryPath = new Path(directory); + + String[] parts = initDate.split("-"); + Preconditions.checkArgument(parts.length == 3); + Calendar calendar = Calendar.getInstance(); + calendar.set(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]) - 1, Integer.parseInt(parts[2]), 0, 0, 0); + initTime = calendar.getTimeInMillis(); + + try { + List<String> files = findFiles(); + for (String file : files) { + dataFiles.add(file); + } + if (logger.isDebugEnabled()) { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + logger.debug("settings {} {} {} {} {}", directory, rate, dateFormat.format(oneDayBack), + dateFormat.format(new Date(initTime)), currentFile); + for (String file : dataFiles) { + logger.debug("settings add file {}", file); + } + } + + fs = FileSystem.newInstance(new Path(directory).toUri(), configuration); + Path filePath = new Path(dataFiles.get(currentFile)); + br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(fs.open(filePath)))); + } catch (IOException e) { + throw new RuntimeException(e); + } + + finished = true; + + } + + private List<String> findFiles() throws IOException + { + List<String> files = Lists.newArrayList(); + Path directoryPath = new Path(directory); + FileSystem lfs = FileSystem.newInstance(directoryPath.toUri(), configuration); + try { + logger.debug("checking for new files in {}", directoryPath); + RemoteIterator<LocatedFileStatus> statuses = lfs.listFiles(directoryPath, true); + for (; statuses.hasNext(); ) { + FileStatus status = statuses.next(); + Path path = status.getPath(); + String filePathStr = path.toString(); + if (!filePathStr.endsWith(".gz")) { + continue; + } + logger.debug("new file {}", filePathStr); + files.add(path.toString()); + } + } catch (FileNotFoundException e) { + logger.warn("Failed to list directory {}", directoryPath, e); + throw new RuntimeException(e); + } finally { + lfs.close(); + } + return files; + } + + @Override + public void start() + { + super.start(); + emitTimer = new Timer(); + + final ChannelProcessor channelProcessor = getChannelProcessor(); + emitTimer.scheduleAtFixedRate(new TimerTask() + { + @Override + public void run() + { + int lineCount = 0; + events.clear(); + try { + while (lineCount < rate && !finished) { + String line = br.readLine(); + + if (line == null) { + logger.debug("completed file {}", currentFile); + br.close(); + currentFile++; + if (currentFile == dataFiles.size()) { + logger.info("finished all files"); + finished = true; + break; + } + Path filePath = new Path(dataFiles.get(currentFile)); + br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(fs.open(filePath)))); + logger.info("opening file {}. {}", currentFile, filePath); + continue; + } + lineCount++; + Event flumeEvent = EventBuilder.withBody(line.getBytes()); + events.add(flumeEvent); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + if (events.size() > 0) { + channelProcessor.processEventBatch(events); + } + if (finished) { + emitTimer.cancel(); + } + } + + }, 0, 1000); + } + + @Override + public void stop() + { + emitTimer.cancel(); + super.stop(); + } + + private static final Logger logger = LoggerFactory.getLogger(HdfsTestSource.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/source/TestSource.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/source/TestSource.java b/flume/src/main/java/org/apache/apex/malhar/flume/source/TestSource.java new file mode 100644 index 0000000..87c118f --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/source/TestSource.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.source; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; + +import javax.annotation.Nonnull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDrivenSource; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.conf.Configurable; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.source.AbstractSource; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * <p>TestSource class.</p> + * + * @since 0.9.4 + */ +public class TestSource extends AbstractSource implements EventDrivenSource, Configurable +{ + public static final String SOURCE_FILE = "sourceFile"; + public static final String LINE_NUMBER = "lineNumber"; + public static final String RATE = "rate"; + public static final String PERCENT_PAST_EVENTS = "percentPastEvents"; + static byte FIELD_SEPARATOR = 1; + static int DEF_PERCENT_PAST_EVENTS = 5; + public Timer emitTimer; + @Nonnull + String filePath; + int rate; + int numberOfPastEvents; + transient List<Row> cache; + private transient int startIndex; + private transient Random random; + private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + private SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + public TestSource() + { + super(); + this.rate = 2500; + this.numberOfPastEvents = DEF_PERCENT_PAST_EVENTS * 25; + this.random = new Random(); + + } + + @Override + public void configure(Context context) + { + filePath = context.getString(SOURCE_FILE); + rate = context.getInteger(RATE, rate); + int percentPastEvents = context.getInteger(PERCENT_PAST_EVENTS, DEF_PERCENT_PAST_EVENTS); + Preconditions.checkArgument(!Strings.isNullOrEmpty(filePath)); + try { + BufferedReader lineReader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); + try { + buildCache(lineReader); + } finally { + lineReader.close(); + } + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + + if (DEF_PERCENT_PAST_EVENTS != percentPastEvents) { + numberOfPastEvents = (int)(percentPastEvents / 100.0 * cache.size()); + } + } + + @Override + public void start() + { + super.start(); + emitTimer = new Timer(); + + final ChannelProcessor channel = getChannelProcessor(); + final int cacheSize = cache.size(); + emitTimer.scheduleAtFixedRate(new TimerTask() + { + @Override + public void run() + { + int lastIndex = startIndex + rate; + if (lastIndex > cacheSize) { + lastIndex -= cacheSize; + processBatch(channel, cache.subList(startIndex, cacheSize)); + startIndex = 0; + while (lastIndex > cacheSize) { + processBatch(channel, cache); + lastIndex -= cacheSize; + } + processBatch(channel, cache.subList(0, lastIndex)); + } else { + processBatch(channel, cache.subList(startIndex, lastIndex)); + } + startIndex = lastIndex; + } + + }, 0, 1000); + } + + private void processBatch(ChannelProcessor channelProcessor, List<Row> rows) + { + if (rows.isEmpty()) { + return; + } + + int noise = random.nextInt(numberOfPastEvents + 1); + Set<Integer> pastIndices = Sets.newHashSet(); + for (int i = 0; i < noise; i++) { + pastIndices.add(random.nextInt(rows.size())); + } + + Calendar calendar = Calendar.getInstance(); + long high = calendar.getTimeInMillis(); + calendar.add(Calendar.DATE, -2); + long low = calendar.getTimeInMillis(); + + + + List<Event> events = Lists.newArrayList(); + for (int i = 0; i < rows.size(); i++) { + Row eventRow = rows.get(i); + if (pastIndices.contains(i)) { + long pastTime = (long)((Math.random() * (high - low)) + low); + byte[] pastDateField = dateFormat.format(pastTime).getBytes(); + byte[] pastTimeField = timeFormat.format(pastTime).getBytes(); + + System.arraycopy(pastDateField, 0, eventRow.bytes, eventRow.dateFieldStart, pastDateField.length); + System.arraycopy(pastTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, pastTimeField.length); + } else { + calendar.setTimeInMillis(System.currentTimeMillis()); + byte[] currentDateField = dateFormat.format(calendar.getTime()).getBytes(); + byte[] currentTimeField = timeFormat.format(calendar.getTime()).getBytes(); + + System.arraycopy(currentDateField, 0, eventRow.bytes, eventRow.dateFieldStart, currentDateField.length); + System.arraycopy(currentTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, currentTimeField.length); + } + + HashMap<String, String> headers = new HashMap<String, String>(2); + headers.put(SOURCE_FILE, filePath); + headers.put(LINE_NUMBER, String.valueOf(startIndex + i)); + events.add(EventBuilder.withBody(eventRow.bytes, headers)); + } + channelProcessor.processEventBatch(events); + } + + @Override + public void stop() + { + emitTimer.cancel(); + super.stop(); + } + + private void buildCache(BufferedReader lineReader) throws IOException + { + cache = Lists.newArrayListWithCapacity(rate); + + String line; + while ((line = lineReader.readLine()) != null) { + byte[] row = line.getBytes(); + Row eventRow = new Row(row); + final int rowsize = row.length; + + /* guid */ + int sliceLengh = -1; + while (++sliceLengh < rowsize) { + if (row[sliceLengh] == FIELD_SEPARATOR) { + break; + } + } + int recordStart = sliceLengh + 1; + int pointer = sliceLengh + 1; + while (pointer < rowsize) { + if (row[pointer++] == FIELD_SEPARATOR) { + eventRow.dateFieldStart = recordStart; + break; + } + } + + /* lets parse the date */ + int dateStart = pointer; + while (pointer < rowsize) { + if (row[pointer++] == FIELD_SEPARATOR) { + eventRow.timeFieldStart = dateStart; + break; + } + } + + cache.add(eventRow); + } + } + + private static class Row + { + final byte[] bytes; + int dateFieldStart; + int timeFieldStart; +// boolean past; + + Row(byte[] bytes) + { + this.bytes = bytes; + } + + } + + private static final Logger logger = LoggerFactory.getLogger(TestSource.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/storage/DebugWrapper.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/storage/DebugWrapper.java b/flume/src/main/java/org/apache/apex/malhar/flume/storage/DebugWrapper.java new file mode 100644 index 0000000..ae1ed23 --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/storage/DebugWrapper.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Context; +import org.apache.flume.conf.Configurable; + +import com.datatorrent.api.Component; +import com.datatorrent.netlet.util.Slice; + +/** + * <p>DebugWrapper class.</p> + * + * @since 0.9.4 + */ +public class DebugWrapper implements Storage, Configurable, Component<com.datatorrent.api.Context> +{ + HDFSStorage storage = new HDFSStorage(); + + @Override + public byte[] store(Slice bytes) + { + byte[] ret = null; + + try { + ret = storage.store(bytes); + } finally { + logger.debug("storage.store(new byte[]{{}});", bytes); + } + + return ret; + } + + @Override + public byte[] retrieve(byte[] identifier) + { + byte[] ret = null; + + try { + ret = storage.retrieve(identifier); + } finally { + logger.debug("storage.retrieve(new byte[]{{}});", identifier); + } + + return ret; + } + + @Override + public byte[] retrieveNext() + { + byte[] ret = null; + try { + ret = storage.retrieveNext(); + } finally { + logger.debug("storage.retrieveNext();"); + } + + return ret; + } + + @Override + public void clean(byte[] identifier) + { + try { + storage.clean(identifier); + } finally { + logger.debug("storage.clean(new byte[]{{}});", identifier); + } + } + + @Override + public void flush() + { + try { + storage.flush(); + } finally { + logger.debug("storage.flush();"); + } + } + + @Override + public void configure(Context cntxt) + { + try { + storage.configure(cntxt); + } finally { + logger.debug("storage.configure({});", cntxt); + } + } + + @Override + public void setup(com.datatorrent.api.Context t1) + { + try { + storage.setup(t1); + } finally { + logger.debug("storage.setup({});", t1); + } + + } + + @Override + public void teardown() + { + try { + storage.teardown(); + } finally { + logger.debug("storage.teardown();"); + } + } + + private static final Logger logger = LoggerFactory.getLogger(DebugWrapper.class); +}
