http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/sink/Server.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/sink/Server.java b/flume/src/main/java/com/datatorrent/flume/sink/Server.java new file mode 100644 index 0000000..14d9ff4 --- /dev/null +++ b/flume/src/main/java/com/datatorrent/flume/sink/Server.java @@ -0,0 +1,419 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.sink; + +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Arrays; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.flume.discovery.Discovery; +import com.datatorrent.flume.discovery.Discovery.Service; +import com.datatorrent.netlet.AbstractLengthPrependerClient; +import com.datatorrent.netlet.AbstractServer; +import com.datatorrent.netlet.EventLoop; +import com.datatorrent.netlet.util.Slice; + +/** + * <p> + * Server class.</p> + * + * @author Chetan Narsude <[email protected]> + * @since 0.9.2 + */ +public class Server extends AbstractServer +{ + private final String id; + private final Discovery<byte[]> discovery; + private final long acceptedTolerance; + + public Server(String id, Discovery<byte[]> discovery, long acceptedTolerance) + { + this.id = id; + this.discovery = discovery; + this.acceptedTolerance = acceptedTolerance; + } + + @Override + public void handleException(Exception cce, EventLoop el) + { + logger.error("Server Error", cce); + Request r = new Request(Command.SERVER_ERROR, null) + { + @Override + public Slice getAddress() + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public int getEventCount() + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public int getIdleCount() + { + throw new UnsupportedOperationException("Not supported yet."); + } + + }; + synchronized (requests) { + requests.add(r); + } + } + + private final Service<byte[]> service = new Service<byte[]>() + { + @Override + public String getHost() + { + return ((InetSocketAddress)getServerAddress()).getHostName(); + } + + @Override + public int getPort() + { + return ((InetSocketAddress)getServerAddress()).getPort(); + } + + @Override + public byte[] getPayload() + { + return null; + } + + @Override + public String getId() + { + return id; + } + + @Override + public String toString() + { + return "Server.Service{id=" + id + ", host=" + getHost() + ", port=" + getPort() + ", payload=" + + Arrays.toString(getPayload()) + '}'; + } + + }; + + @Override + public void unregistered(final SelectionKey key) + { + discovery.unadvertise(service); + super.unregistered(key); + } + + @Override + public void registered(final SelectionKey key) + { + super.registered(key); + discovery.advertise(service); + } + + public enum Command + { + ECHO((byte)0), + SEEK((byte)1), + COMMITTED((byte)2), + CHECKPOINTED((byte)3), + CONNECTED((byte)4), + DISCONNECTED((byte)5), + WINDOWED((byte)6), + SERVER_ERROR((byte)7); + + Command(byte b) + { + this.ord = b; + } + + public byte getOrdinal() + { + return ord; + } + + public static Command getCommand(byte b) + { + Command c; + switch (b) { + case 0: + c = ECHO; + break; + + case 1: + c = SEEK; + break; + + case 2: + c = COMMITTED; + break; + + case 3: + c = CHECKPOINTED; + break; + + case 4: + c = CONNECTED; + break; + + case 5: + c = DISCONNECTED; + break; + + case 6: + c = WINDOWED; + break; + + case 7: + c = SERVER_ERROR; + break; + + default: + throw new IllegalArgumentException(String.format("No Command defined for ordinal %b", b)); + } + + assert (b == c.ord); + return c; + } + + private final byte ord; + } + + public final ArrayList<Request> requests = new ArrayList<Request>(4); + + @Override + public ClientListener getClientConnection(SocketChannel sc, ServerSocketChannel ssc) + { + Client lClient = new Client(); + lClient.connected(); + return lClient; + } + + public class Client extends AbstractLengthPrependerClient + { + + @Override + public void onMessage(byte[] buffer, int offset, int size) + { + if (size != Request.FIXED_SIZE) { + logger.warn("Invalid Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size), + key.channel()); + return; + } + + long requestTime = Server.readLong(buffer, offset + Request.TIME_OFFSET); + if (System.currentTimeMillis() > (requestTime + acceptedTolerance)) { + logger.warn("Expired Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size), + key.channel()); + return; + } + + try { + if (Command.getCommand(buffer[offset]) == Command.ECHO) { + write(buffer, offset, size); + return; + } + } catch (IllegalArgumentException ex) { + logger.warn("Invalid Request Received: {} from {}!", Arrays.copyOfRange(buffer, offset, offset + size), + key.channel(), ex); + return; + } + + Request r = Request.getRequest(buffer, offset, this); + synchronized (requests) { + requests.add(r); + } + } + + @Override + public void disconnected() + { + synchronized (requests) { + requests.add(Request.getRequest( + new byte[] {Command.DISCONNECTED.getOrdinal(), 0, 0, 0, 0, 0, 0, 0, 0}, 0, this)); + } + super.disconnected(); + } + + public boolean write(byte[] address, Slice event) + { + if (event.offset == 0 && event.length == event.buffer.length) { + return write(address, event.buffer); + } + + // a better method would be to replace the write implementation and allow it to natively support writing slices + return write(address, event.toByteArray()); + } + + } + + public abstract static class Request + { + public static final int FIXED_SIZE = 17; + public static final int TIME_OFFSET = 9; + public final Command type; + public final Client client; + + public Request(Command type, Client client) + { + this.type = type; + this.client = client; + } + + public abstract Slice getAddress(); + + public abstract int getEventCount(); + + public abstract int getIdleCount(); + + @Override + public String toString() + { + return "Request{" + "type=" + type + '}'; + } + + public static Request getRequest(final byte[] buffer, final int offset, Client client) + { + Command command = Command.getCommand(buffer[offset]); + switch (command) { + case WINDOWED: + return new Request(Command.WINDOWED, client) + { + final int eventCount; + final int idleCount; + + { + eventCount = Server.readInt(buffer, offset + 1); + idleCount = Server.readInt(buffer, offset + 5); + } + + @Override + public Slice getAddress() + { + throw new UnsupportedOperationException(); + } + + @Override + public int getEventCount() + { + return eventCount; + } + + @Override + public int getIdleCount() + { + return idleCount; + } + + @Override + public String toString() + { + return "Request{" + "type=" + type + ", eventCount=" + eventCount + ", idleCount=" + idleCount + '}'; + } + + }; + + default: + return new Request(command, client) + { + final Slice address; + + { + address = new Slice(buffer, offset + 1, 8); + } + + @Override + public Slice getAddress() + { + return address; + } + + @Override + public int getEventCount() + { + throw new UnsupportedOperationException(); + } + + @Override + public int getIdleCount() + { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() + { + return "Request{" + "type=" + type + ", address=" + address + '}'; + } + + }; + + } + + } + + } + + public static int readInt(byte[] buffer, int offset) + { + return buffer[offset++] & 0xff + | (buffer[offset++] & 0xff) << 8 + | (buffer[offset++] & 0xff) << 16 + | (buffer[offset++] & 0xff) << 24; + } + + public static void writeInt(byte[] buffer, int offset, int i) + { + buffer[offset++] = (byte)i; + buffer[offset++] = (byte)(i >>> 8); + buffer[offset++] = (byte)(i >>> 16); + buffer[offset++] = (byte)(i >>> 24); + } + + public static long readLong(byte[] buffer, int offset) + { + return (long)buffer[offset++] & 0xff + | (long)(buffer[offset++] & 0xff) << 8 + | (long)(buffer[offset++] & 0xff) << 16 + | (long)(buffer[offset++] & 0xff) << 24 + | (long)(buffer[offset++] & 0xff) << 32 + | (long)(buffer[offset++] & 0xff) << 40 + | (long)(buffer[offset++] & 0xff) << 48 + | (long)(buffer[offset++] & 0xff) << 56; + } + + public static void writeLong(byte[] buffer, int offset, long l) + { + buffer[offset++] = (byte)l; + buffer[offset++] = (byte)(l >>> 8); + buffer[offset++] = (byte)(l >>> 16); + buffer[offset++] = (byte)(l >>> 24); + buffer[offset++] = (byte)(l >>> 32); + buffer[offset++] = (byte)(l >>> 40); + buffer[offset++] = (byte)(l >>> 48); + buffer[offset++] = (byte)(l >>> 56); + } + + private static final Logger logger = LoggerFactory.getLogger(Server.class); +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/source/TestSource.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/source/TestSource.java b/flume/src/main/java/com/datatorrent/flume/source/TestSource.java new file mode 100644 index 0000000..490ac35 --- /dev/null +++ b/flume/src/main/java/com/datatorrent/flume/source/TestSource.java @@ -0,0 +1,248 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.source; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; + +import javax.annotation.Nonnull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDrivenSource; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.conf.Configurable; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.source.AbstractSource; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * <p>TestSource class.</p> + * + * @since 0.9.4 + */ +public class TestSource extends AbstractSource implements EventDrivenSource, Configurable +{ + public static final String SOURCE_FILE = "sourceFile"; + public static final String LINE_NUMBER = "lineNumber"; + public static final String RATE = "rate"; + public static final String PERCENT_PAST_EVENTS = "percentPastEvents"; + static byte FIELD_SEPARATOR = 1; + static int DEF_PERCENT_PAST_EVENTS = 5; + public Timer emitTimer; + @Nonnull + String filePath; + int rate; + int numberOfPastEvents; + transient List<Row> cache; + private transient int startIndex; + private transient Random random; + private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + private SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + public TestSource() + { + super(); + this.rate = 2500; + this.numberOfPastEvents = DEF_PERCENT_PAST_EVENTS * 25; + this.random = new Random(); + + } + + @Override + public void configure(Context context) + { + filePath = context.getString(SOURCE_FILE); + rate = context.getInteger(RATE, rate); + int percentPastEvents = context.getInteger(PERCENT_PAST_EVENTS, DEF_PERCENT_PAST_EVENTS); + Preconditions.checkArgument(!Strings.isNullOrEmpty(filePath)); + try { + BufferedReader lineReader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); + try { + buildCache(lineReader); + } finally { + lineReader.close(); + } + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + + if (DEF_PERCENT_PAST_EVENTS != percentPastEvents) { + numberOfPastEvents = (int)(percentPastEvents / 100.0 * cache.size()); + } + } + + @Override + public void start() + { + super.start(); + emitTimer = new Timer(); + + final ChannelProcessor channel = getChannelProcessor(); + final int cacheSize = cache.size(); + emitTimer.scheduleAtFixedRate(new TimerTask() + { + @Override + public void run() + { + int lastIndex = startIndex + rate; + if (lastIndex > cacheSize) { + lastIndex -= cacheSize; + processBatch(channel, cache.subList(startIndex, cacheSize)); + startIndex = 0; + while (lastIndex > cacheSize) { + processBatch(channel, cache); + lastIndex -= cacheSize; + } + processBatch(channel, cache.subList(0, lastIndex)); + } else { + processBatch(channel, cache.subList(startIndex, lastIndex)); + } + startIndex = lastIndex; + } + + }, 0, 1000); + } + + private void processBatch(ChannelProcessor channelProcessor, List<Row> rows) + { + if (rows.isEmpty()) { + return; + } + + int noise = random.nextInt(numberOfPastEvents + 1); + Set<Integer> pastIndices = Sets.newHashSet(); + for (int i = 0; i < noise; i++) { + pastIndices.add(random.nextInt(rows.size())); + } + + Calendar calendar = Calendar.getInstance(); + long high = calendar.getTimeInMillis(); + calendar.add(Calendar.DATE, -2); + long low = calendar.getTimeInMillis(); + + + + List<Event> events = Lists.newArrayList(); + for (int i = 0; i < rows.size(); i++) { + Row eventRow = rows.get(i); + if (pastIndices.contains(i)) { + long pastTime = (long)((Math.random() * (high - low)) + low); + byte[] pastDateField = dateFormat.format(pastTime).getBytes(); + byte[] pastTimeField = timeFormat.format(pastTime).getBytes(); + + System.arraycopy(pastDateField, 0, eventRow.bytes, eventRow.dateFieldStart, pastDateField.length); + System.arraycopy(pastTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, pastTimeField.length); + } else { + calendar.setTimeInMillis(System.currentTimeMillis()); + byte[] currentDateField = dateFormat.format(calendar.getTime()).getBytes(); + byte[] currentTimeField = timeFormat.format(calendar.getTime()).getBytes(); + + System.arraycopy(currentDateField, 0, eventRow.bytes, eventRow.dateFieldStart, currentDateField.length); + System.arraycopy(currentTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, currentTimeField.length); + } + + HashMap<String, String> headers = new HashMap<String, String>(2); + headers.put(SOURCE_FILE, filePath); + headers.put(LINE_NUMBER, String.valueOf(startIndex + i)); + events.add(EventBuilder.withBody(eventRow.bytes, headers)); + } + channelProcessor.processEventBatch(events); + } + + @Override + public void stop() + { + emitTimer.cancel(); + super.stop(); + } + + private void buildCache(BufferedReader lineReader) throws IOException + { + cache = Lists.newArrayListWithCapacity(rate); + + String line; + while ((line = lineReader.readLine()) != null) { + byte[] row = line.getBytes(); + Row eventRow = new Row(row); + final int rowsize = row.length; + + /* guid */ + int sliceLengh = -1; + while (++sliceLengh < rowsize) { + if (row[sliceLengh] == FIELD_SEPARATOR) { + break; + } + } + int recordStart = sliceLengh + 1; + int pointer = sliceLengh + 1; + while (pointer < rowsize) { + if (row[pointer++] == FIELD_SEPARATOR) { + eventRow.dateFieldStart = recordStart; + break; + } + } + + /* lets parse the date */ + int dateStart = pointer; + while (pointer < rowsize) { + if (row[pointer++] == FIELD_SEPARATOR) { + eventRow.timeFieldStart = dateStart; + break; + } + } + + cache.add(eventRow); + } + } + + private static class Row + { + final byte[] bytes; + int dateFieldStart; + int timeFieldStart; +// boolean past; + + Row(byte[] bytes) + { + this.bytes = bytes; + } + + } + + private static final Logger logger = LoggerFactory.getLogger(TestSource.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java b/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java new file mode 100644 index 0000000..c416418 --- /dev/null +++ b/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java @@ -0,0 +1,131 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Context; +import org.apache.flume.conf.Configurable; + +import com.datatorrent.api.Component; +import com.datatorrent.netlet.util.Slice; + +/** + * <p>DebugWrapper class.</p> + * + * @author Chetan Narsude <[email protected]> + * @since 0.9.4 + */ +public class DebugWrapper implements Storage, Configurable, Component<com.datatorrent.api.Context> +{ + HDFSStorage storage = new HDFSStorage(); + + @Override + public byte[] store(Slice bytes) + { + byte[] ret = null; + + try { + ret = storage.store(bytes); + } finally { + logger.debug("storage.store(new byte[]{{}});", bytes); + } + + return ret; + } + + @Override + public byte[] retrieve(byte[] identifier) + { + byte[] ret = null; + + try { + ret = storage.retrieve(identifier); + } finally { + logger.debug("storage.retrieve(new byte[]{{}});", identifier); + } + + return ret; + } + + @Override + public byte[] retrieveNext() + { + byte[] ret = null; + try { + ret = storage.retrieveNext(); + } finally { + logger.debug("storage.retrieveNext();"); + } + + return ret; + } + + @Override + public void clean(byte[] identifier) + { + try { + storage.clean(identifier); + } finally { + logger.debug("storage.clean(new byte[]{{}});", identifier); + } + } + + @Override + public void flush() + { + try { + storage.flush(); + } finally { + logger.debug("storage.flush();"); + } + } + + @Override + public void configure(Context cntxt) + { + try { + storage.configure(cntxt); + } finally { + logger.debug("storage.configure({});", cntxt); + } + } + + @Override + public void setup(com.datatorrent.api.Context t1) + { + try { + storage.setup(t1); + } finally { + logger.debug("storage.setup({});", t1); + } + + } + + @Override + public void teardown() + { + try { + storage.teardown(); + } finally { + logger.debug("storage.teardown();"); + } + } + + private static final Logger logger = LoggerFactory.getLogger(DebugWrapper.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java b/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java new file mode 100644 index 0000000..59c7fd3 --- /dev/null +++ b/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java @@ -0,0 +1,61 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Event; + +import com.datatorrent.netlet.util.Slice; + +/** + * <p>ErrorMaskingEventCodec class.</p> + * + * @author Chetan Narsude <[email protected]> + * @since 1.0.4 + */ +public class ErrorMaskingEventCodec extends EventCodec +{ + + @Override + public Object fromByteArray(Slice fragment) + { + try { + return super.fromByteArray(fragment); + } catch (RuntimeException re) { + logger.warn("Cannot deserialize event {}", fragment, re); + } + + return null; + } + + @Override + public Slice toByteArray(Event event) + { + try { + return super.toByteArray(event); + } catch (RuntimeException re) { + logger.warn("Cannot serialize event {}", event, re); + } + + return null; + } + + + private static final Logger logger = LoggerFactory.getLogger(ErrorMaskingEventCodec.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java b/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java new file mode 100644 index 0000000..03d0d87 --- /dev/null +++ b/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java @@ -0,0 +1,91 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.storage; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import com.datatorrent.api.StreamCodec; +import com.datatorrent.netlet.util.Slice; + +/** + * <p>EventCodec class.</p> + * + * @author Chetan Narsude <[email protected]> + * @since 0.9.4 + */ +public class EventCodec implements StreamCodec<Event> +{ + private final transient Kryo kryo; + + public EventCodec() + { + this.kryo = new Kryo(); + this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + } + + @Override + public Object fromByteArray(Slice fragment) + { + ByteArrayInputStream is = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length); + Input input = new Input(is); + + @SuppressWarnings("unchecked") + HashMap<String, String> headers = kryo.readObjectOrNull(input, HashMap.class); + byte[] body = kryo.readObjectOrNull(input, byte[].class); + return EventBuilder.withBody(body, headers); + } + + @Override + public Slice toByteArray(Event event) + { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + Output output = new Output(os); + + Map<String, String> headers = event.getHeaders(); + if (headers != null && headers.getClass() != HashMap.class) { + HashMap<String, String> tmp = new HashMap<String, String>(headers.size()); + tmp.putAll(headers); + headers = tmp; + } + kryo.writeObjectOrNull(output, headers, HashMap.class); + kryo.writeObjectOrNull(output, event.getBody(), byte[].class); + output.flush(); + final byte[] bytes = os.toByteArray(); + return new Slice(bytes, 0, bytes.length); + } + + @Override + public int getPartition(Event o) + { + return o.hashCode(); + } + + private static final Logger logger = LoggerFactory.getLogger(EventCodec.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/resources/flume-conf/flume-conf.sample.properties ---------------------------------------------------------------------- diff --git a/flume/src/main/resources/flume-conf/flume-conf.sample.properties b/flume/src/main/resources/flume-conf/flume-conf.sample.properties new file mode 100644 index 0000000..9d3e430 --- /dev/null +++ b/flume/src/main/resources/flume-conf/flume-conf.sample.properties @@ -0,0 +1,45 @@ +# +# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +#agent1 on node1 + agent1.sources = netcatSource + agent1.channels = ch1 + agent1.sinks = dt + +# first sink - dt + agent1.sinks.dt.type = com.datatorrent.flume.sink.DTFlumeSink + agent1.sinks.dt.id = sink1 + agent1.sinks.dt.hostname = localhost + agent1.sinks.dt.port = 8080 + agent1.sinks.dt.sleepMillis = 7 + agent1.sinks.dt.throughputAdjustmentFactor = 2 + agent1.sinks.dt.maximumEventsPerTransaction = 5000 + agent1.sinks.dt.minimumEventsPerTransaction = 1 + agent1.sinks.dt.storage = com.datatorrent.flume.storage.HDFSStorage + agent1.sinks.dt.storage.restore = false + agent1.sinks.dt.storage.baseDir = /tmp/flume101 + agent1.sinks.dt.channel = ch1 + +# channels + agent1.channels.ch1.type = file + agent1.channels.ch1.capacity = 10000000 + agent1.channels.ch1.transactionCapacity = 10000 + agent1.channels.ch1.maxFileSize = 67108864 + + agent1.sources.netcatSource.type = exec + agent1.sources.netcatSource.channels = ch1 + agent1.sources.netcatSource.command = src/test/bash/subcat_periodically src/test/resources/test_data/dt_spend 10000 1 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/main/resources/flume-conf/flume-env.sample.sh ---------------------------------------------------------------------- diff --git a/flume/src/main/resources/flume-conf/flume-env.sample.sh b/flume/src/main/resources/flume-conf/flume-env.sample.sh new file mode 100644 index 0000000..aca341c --- /dev/null +++ b/flume/src/main/resources/flume-conf/flume-env.sample.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# +# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# This script runs on the machine which have maven repository populated under +# $HOME/.m2 If that's not the case, please adjust the JARPATH variable below +# to point to colon separated list of directories where jar files can be found +if test -z "$DT_FLUME_JAR" +then + echo [ERROR]: Environment variable DT_FLUME_JAR should point to a valid jar file which contains DTFlumeSink class >&2 + exit 2 +fi + +echo JARPATH is set to ${JARPATH:=$HOME/.m2/repository:.} +if test -z "$JAVA_HOME" +then + JAVA=java +else + JAVA=${JAVA_HOME}/bin/java +fi +FLUME_CLASSPATH=`JARPATH=$JARPATH $JAVA -cp $DT_FLUME_JAR com.datatorrent.jarpath.JarPath -N $DT_FLUME_JAR -Xdt-jarpath -Xdt-netlet` http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java b/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java new file mode 100644 index 0000000..4acf764 --- /dev/null +++ b/flume/src/test/java/com/datatorrent/flume/discovery/ZKAssistedDiscoveryTest.java @@ -0,0 +1,142 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.discovery; + +import org.codehaus.jackson.type.TypeReference; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.details.InstanceSerializer; + +import com.datatorrent.flume.discovery.Discovery.Service; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; + +/** + * + * @author Chetan Narsude <[email protected]> + */ +@Ignore +public class ZKAssistedDiscoveryTest +{ + public ZKAssistedDiscoveryTest() + { + } + + @Test + public void testSerialization() throws Exception + { + ZKAssistedDiscovery discovery = new ZKAssistedDiscovery(); + discovery.setServiceName("DTFlumeTest"); + discovery.setConnectionString("localhost:2181"); + discovery.setBasePath("/HelloDT"); + discovery.setup(null); + ServiceInstance<byte[]> instance = discovery.getInstance(new Service<byte[]>() + { + @Override + public String getHost() + { + return "localhost"; + } + + @Override + public int getPort() + { + return 8080; + } + + @Override + public byte[] getPayload() + { + return null; + } + + @Override + public String getId() + { + return "localhost8080"; + } + + }); + InstanceSerializer<byte[]> instanceSerializer = + discovery.getInstanceSerializerFactory().getInstanceSerializer(new TypeReference<ServiceInstance<byte[]>>() + { + }); + byte[] serialize = instanceSerializer.serialize(instance); + logger.debug("serialized json = {}", new String(serialize)); + ServiceInstance<byte[]> deserialize = instanceSerializer.deserialize(serialize); + assertArrayEquals("Metadata", instance.getPayload(), deserialize.getPayload()); + } + + @Test + public void testDiscover() + { + ZKAssistedDiscovery discovery = new ZKAssistedDiscovery(); + discovery.setServiceName("DTFlumeTest"); + discovery.setConnectionString("localhost:2181"); + discovery.setBasePath("/HelloDT"); + discovery.setup(null); + assertNotNull("Discovered Sinks", discovery.discover()); + discovery.teardown(); + } + + @Test + public void testAdvertize() + { + ZKAssistedDiscovery discovery = new ZKAssistedDiscovery(); + discovery.setServiceName("DTFlumeTest"); + discovery.setConnectionString("localhost:2181"); + discovery.setBasePath("/HelloDT"); + discovery.setup(null); + + Service<byte[]> service = new Service<byte[]>() + { + @Override + public String getHost() + { + return "chetan"; + } + + @Override + public int getPort() + { + return 5033; + } + + @Override + public byte[] getPayload() + { + return new byte[] {3, 2, 1}; + } + + @Override + public String getId() + { + return "uniqueId"; + } + + }; + discovery.advertise(service); + discovery.teardown(); + } + + private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscoveryTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java b/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java new file mode 100644 index 0000000..41364c8 --- /dev/null +++ b/flume/src/test/java/com/datatorrent/flume/integration/ApplicationTest.java @@ -0,0 +1,116 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.integration; + +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Event; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.flume.operator.AbstractFlumeInputOperator; +import com.datatorrent.flume.storage.EventCodec; + +/** + * + * @author Chetan Narsude <[email protected]> + */ +@Ignore +public class ApplicationTest implements StreamingApplication +{ + public static class FlumeInputOperator extends AbstractFlumeInputOperator<Event> + { + @Override + public Event convert(Event event) + { + return event; + } + } + + public static class Counter implements Operator + { + private int count; + private transient Event event; + public final transient DefaultInputPort<Event> input = new DefaultInputPort<Event>() + { + @Override + public void process(Event tuple) + { + count++; + event = tuple; + } + + }; + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + logger.debug("total count = {}, tuple = {}", count, event); + } + + @Override + public void setup(OperatorContext context) + { + } + + @Override + public void teardown() + { + } + + private static final Logger logger = LoggerFactory.getLogger(Counter.class); + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + dag.setAttribute(com.datatorrent.api.Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 1000); + FlumeInputOperator flume = dag.addOperator("FlumeOperator", new FlumeInputOperator()); + flume.setConnectAddresses(new String[]{"test:127.0.0.1:8080"}); + flume.setCodec(new EventCodec()); + Counter counter = dag.addOperator("Counter", new Counter()); + + dag.addStream("Slices", flume.output, counter.input).setLocality(Locality.CONTAINER_LOCAL); + } + + @Test + public void test() + { + try { + LocalMode.runApp(this, Integer.MAX_VALUE); + } catch (Exception ex) { + logger.warn("The dag seems to be not testable yet, if it's - remove this exception handling", ex); + } + + } + + private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java new file mode 100644 index 0000000..464df42 --- /dev/null +++ b/flume/src/test/java/com/datatorrent/flume/interceptor/ColumnFilteringInterceptorTest.java @@ -0,0 +1,85 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.interceptor; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.HashMap; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.flume.Context; +import org.apache.flume.interceptor.Interceptor; + +import static org.junit.Assert.assertArrayEquals; + +/** + * @author Chetan Narsude <[email protected]> + */ +public class ColumnFilteringInterceptorTest +{ + private static InterceptorTestHelper helper; + + @BeforeClass + public static void startUp() + { + HashMap<String, String> contextMap = new HashMap<String, String>(); + contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1)); + contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); + contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "1 2 3"); + + helper = new InterceptorTestHelper(new ColumnFilteringInterceptor.Builder(), contextMap); + } + + @Test + public void testInterceptEvent() + { + helper.testIntercept_Event(); + } + + @Test + public void testFiles() throws IOException, URISyntaxException + { + helper.testFiles(); + } + + @Test + public void testInterceptEventWithColumnZero() + { + HashMap<String, String> contextMap = new HashMap<String, String>(); + contextMap.put(ColumnFilteringInterceptor.Constants.DST_SEPARATOR, Byte.toString((byte)1)); + contextMap.put(ColumnFilteringInterceptor.Constants.SRC_SEPARATOR, Byte.toString((byte)2)); + contextMap.put(ColumnFilteringInterceptor.Constants.COLUMNS, "0"); + + ColumnFilteringInterceptor.Builder builder = new ColumnFilteringInterceptor.Builder(); + builder.configure(new Context(contextMap)); + Interceptor interceptor = builder.build(); + + assertArrayEquals("Empty Bytes", + "\001".getBytes(), + interceptor.intercept(new InterceptorTestHelper.MyEvent("".getBytes())).getBody()); + + assertArrayEquals("One Field", + "First\001".getBytes(), + interceptor.intercept(new InterceptorTestHelper.MyEvent("First".getBytes())).getBody()); + + assertArrayEquals("Two Fields", + "\001".getBytes(), + interceptor.intercept(new InterceptorTestHelper.MyEvent("\002First".getBytes())).getBody()); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java b/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java new file mode 100644 index 0000000..739184f --- /dev/null +++ b/flume/src/test/java/com/datatorrent/flume/interceptor/InterceptorTestHelper.java @@ -0,0 +1,214 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.interceptor; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.interceptor.Interceptor; + +import com.datatorrent.netlet.util.Slice; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; + +/** + * @author Chetan Narsude <[email protected]> + */ +public class InterceptorTestHelper +{ + private static final byte FIELD_SEPARATOR = 1; + + static class MyEvent implements Event + { + byte[] body; + + MyEvent(byte[] bytes) + { + body = bytes; + } + + @Override + public Map<String, String> getHeaders() + { + return null; + } + + @Override + public void setHeaders(Map<String, String> map) + { + } + + @Override + @SuppressWarnings("ReturnOfCollectionOrArrayField") + public byte[] getBody() + { + return body; + } + + @Override + @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter") + public void setBody(byte[] bytes) + { + body = bytes; + } + } + + private final Interceptor.Builder builder; + private final Map<String, String> context; + + InterceptorTestHelper(Interceptor.Builder builder, Map<String, String> context) + { + this.builder = builder; + this.context = context; + } + + public void testIntercept_Event() + { + builder.configure(new Context(context)); + Interceptor interceptor = builder.build(); + + assertArrayEquals("Empty Bytes", + "\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("".getBytes())).getBody()); + + assertArrayEquals("One Separator", + "\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("\002".getBytes())).getBody()); + + assertArrayEquals("Two Separators", + "\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("\002\002".getBytes())).getBody()); + + assertArrayEquals("One Field", + "\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("First".getBytes())).getBody()); + + assertArrayEquals("Two Fields", + "First\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("\002First".getBytes())).getBody()); + + assertArrayEquals("Two Fields", + "\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("First\001".getBytes())).getBody()); + + assertArrayEquals("Two Fields", + "Second\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("First\002Second".getBytes())).getBody()); + + assertArrayEquals("Three Fields", + "Second\001\001\001".getBytes(), + interceptor.intercept(new MyEvent("First\002Second\002".getBytes())).getBody()); + + assertArrayEquals("Three Fields", + "\001Second\001\001".getBytes(), + interceptor.intercept(new MyEvent("First\002\002Second".getBytes())).getBody()); + + assertArrayEquals("Four Fields", + "\001Second\001\001".getBytes(), + interceptor.intercept(new MyEvent("First\002\002Second\002".getBytes())).getBody()); + + assertArrayEquals("Five Fields", + "\001Second\001\001".getBytes(), + interceptor.intercept(new MyEvent("First\002\002Second\002\002".getBytes())).getBody()); + + assertArrayEquals("Six Fields", + "\001Second\001\001".getBytes(), + interceptor.intercept(new MyEvent("First\002\002Second\002\002\002".getBytes())).getBody()); + } + + public void testFiles() throws IOException, URISyntaxException + { + Properties properties = new Properties(); + properties.load(getClass().getResourceAsStream("/flume/conf/flume-conf.properties")); + + String interceptor = null; + for (Entry<Object, Object> entry : properties.entrySet()) { + logger.debug("{} => {}", entry.getKey(), entry.getValue()); + + if (builder.getClass().getName().equals(entry.getValue().toString())) { + String key = entry.getKey().toString(); + if (key.endsWith(".type")) { + interceptor = key.substring(0, key.length() - "type".length()); + break; + } + } + } + + assertNotNull(builder.getClass().getName(), interceptor); + @SuppressWarnings({"null", "ConstantConditions"}) + final int interceptorLength = interceptor.length(); + + HashMap<String, String> map = new HashMap<String, String>(); + for (Entry<Object, Object> entry : properties.entrySet()) { + String key = entry.getKey().toString(); + if (key.startsWith(interceptor)) { + map.put(key.substring(interceptorLength), entry.getValue().toString()); + } + } + + builder.configure(new Context(map)); + Interceptor interceptorInstance = builder.build(); + + URL url = getClass().getResource("/test_data/gentxns/"); + assertNotNull("Generated Transactions", url); + + int records = 0; + File dir = new File(url.toURI()); + for (File file : dir.listFiles()) { + records += processFile(file, interceptorInstance); + } + + Assert.assertEquals("Total Records", 2200, records); + } + + private int processFile(File file, Interceptor interceptor) throws IOException + { + InputStream stream = getClass().getResourceAsStream("/test_data/gentxns/" + file.getName()); + BufferedReader br = new BufferedReader(new InputStreamReader(stream)); + + String line; + int i = 0; + while ((line = br.readLine()) != null) { + byte[] body = interceptor.intercept(new MyEvent(line.getBytes())).getBody(); + RawEvent event = RawEvent.from(body, FIELD_SEPARATOR); + Assert.assertEquals("GUID", new Slice(line.getBytes(), 0, 32), event.guid); + logger.debug("guid = {}, time = {}", event.guid, event.time); + i++; + } + + br.close(); + return i; + } + + private static final Logger logger = LoggerFactory.getLogger(InterceptorTestHelper.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java b/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java new file mode 100644 index 0000000..049609b --- /dev/null +++ b/flume/src/test/java/com/datatorrent/flume/interceptor/RawEvent.java @@ -0,0 +1,119 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.interceptor; + +import java.io.Serializable; + +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.netlet.util.Slice; + +/** + * + * @author Chetan Narsude <[email protected]> + */ +public class RawEvent implements Serializable +{ + public Slice guid; + public long time; + public int dimensionsOffset; + + public Slice getGUID() + { + return guid; + } + + public long getTime() + { + return time; + } + + RawEvent() + { + /* needed for Kryo serialization */ + } + + public static RawEvent from(byte[] row, byte separator) + { + final int rowsize = row.length; + + /* + * Lets get the guid out of the current record + */ + int sliceLengh = -1; + while (++sliceLengh < rowsize) { + if (row[sliceLengh] == separator) { + break; + } + } + + int i = sliceLengh + 1; + + /* lets parse the date */ + int dateStart = i; + while (i < rowsize) { + if (row[i++] == separator) { + long time = DATE_PARSER.parseMillis(new String(row, dateStart, i - dateStart - 1)); + RawEvent event = new RawEvent(); + event.guid = new Slice(row, 0, sliceLengh); + event.time = time; + event.dimensionsOffset = i; + return event; + } + } + + return null; + } + + @Override + public int hashCode() + { + int hash = 5; + hash = 61 * hash + (this.guid != null ? this.guid.hashCode() : 0); + hash = 61 * hash + (int)(this.time ^ (this.time >>> 32)); + return hash; + } + + @Override + public String toString() + { + return "RawEvent{" + "guid=" + guid + ", time=" + time + '}'; + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final RawEvent other = (RawEvent)obj; + if (this.guid != other.guid && (this.guid == null || !this.guid.equals(other.guid))) { + return false; + } + return this.time == other.time; + } + + private static final DateTimeFormatter DATE_PARSER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); + private static final Logger logger = LoggerFactory.getLogger(RawEvent.class); + private static final long serialVersionUID = 201312191312L; +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java b/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java new file mode 100644 index 0000000..a615496 --- /dev/null +++ b/flume/src/test/java/com/datatorrent/flume/operator/AbstractFlumeInputOperatorTest.java @@ -0,0 +1,56 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.operator; + +import java.util.HashSet; +import java.util.Set; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +/** + * + * @author Chetan Narsude <[email protected]> + */ +public class AbstractFlumeInputOperatorTest +{ + public AbstractFlumeInputOperatorTest() + { + } + + @Test + public void testThreadLocal() + { + ThreadLocal<Set<Integer>> tl = new ThreadLocal<Set<Integer>>() + { + @Override + protected Set<Integer> initialValue() + { + return new HashSet<Integer>(); + } + + }; + Set<Integer> get1 = tl.get(); + get1.add(1); + assertTrue("Just Added Value", get1.contains(1)); + + Set<Integer> get2 = tl.get(); + assertTrue("Previously added value", get2.contains(1)); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java b/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java new file mode 100644 index 0000000..833a353 --- /dev/null +++ b/flume/src/test/java/com/datatorrent/flume/sink/DTFlumeSinkTest.java @@ -0,0 +1,143 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.sink; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.flume.channel.MemoryChannel; + +import com.datatorrent.flume.discovery.Discovery; +import com.datatorrent.netlet.AbstractLengthPrependerClient; +import com.datatorrent.netlet.DefaultEventLoop; +import com.datatorrent.netlet.util.Slice; + +/** + * @author Chetan Narsude <[email protected]> + */ +public class DTFlumeSinkTest +{ + static final String hostname = "localhost"; + int port = 0; + + @Test + @SuppressWarnings("SleepWhileInLoop") + public void testServer() throws InterruptedException, IOException + { + Discovery<byte[]> discovery = new Discovery<byte[]>() + { + @Override + public synchronized void unadvertise(Service<byte[]> service) + { + notify(); + } + + @Override + public synchronized void advertise(Service<byte[]> service) + { + port = service.getPort(); + logger.debug("listening at {}", service); + notify(); + } + + @Override + @SuppressWarnings("unchecked") + public synchronized Collection<Service<byte[]>> discover() + { + try { + wait(); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + return Collections.EMPTY_LIST; + } + + }; + DTFlumeSink sink = new DTFlumeSink(); + sink.setName("TeskSink"); + sink.setHostname(hostname); + sink.setPort(0); + sink.setAcceptedTolerance(2000); + sink.setChannel(new MemoryChannel()); + sink.setDiscovery(discovery); + sink.start(); + AbstractLengthPrependerClient client = new AbstractLengthPrependerClient() + { + private byte[] array; + private int offset = 2; + + @Override + public void onMessage(byte[] buffer, int offset, int size) + { + Slice received = new Slice(buffer, offset, size); + logger.debug("Client Received = {}", received); + Assert.assertEquals(received, + new Slice(Arrays.copyOfRange(array, this.offset, array.length), 0, Server.Request.FIXED_SIZE)); + synchronized (DTFlumeSinkTest.this) { + DTFlumeSinkTest.this.notify(); + } + } + + @Override + public void connected() + { + super.connected(); + array = new byte[Server.Request.FIXED_SIZE + offset]; + array[offset] = Server.Command.ECHO.getOrdinal(); + array[offset + 1] = 1; + array[offset + 2] = 2; + array[offset + 3] = 3; + array[offset + 4] = 4; + array[offset + 5] = 5; + array[offset + 6] = 6; + array[offset + 7] = 7; + array[offset + 8] = 8; + Server.writeLong(array, offset + Server.Request.TIME_OFFSET, System.currentTimeMillis()); + write(array, offset, Server.Request.FIXED_SIZE); + } + + }; + + DefaultEventLoop eventloop = new DefaultEventLoop("Eventloop-TestClient"); + eventloop.start(); + discovery.discover(); + try { + eventloop.connect(new InetSocketAddress(hostname, port), client); + try { + synchronized (this) { + this.wait(); + } + } finally { + eventloop.disconnect(client); + } + } finally { + eventloop.stop(); + } + + sink.stop(); + } + + private static final Logger logger = LoggerFactory.getLogger(DTFlumeSinkTest.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java ---------------------------------------------------------------------- diff --git a/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java b/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java new file mode 100644 index 0000000..64495db --- /dev/null +++ b/flume/src/test/java/com/datatorrent/flume/sink/ServerTest.java @@ -0,0 +1,92 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.flume.sink; + +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; + +/** + * + * @author Chetan Narsude <[email protected]> + */ +public class ServerTest +{ + byte[] array; + + public ServerTest() + { + array = new byte[1024]; + } + + @Test + public void testInt() + { + Server.writeInt(array, 0, Integer.MAX_VALUE); + Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readInt(array, 0)); + + Server.writeInt(array, 0, Integer.MIN_VALUE); + Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readInt(array, 0)); + + Server.writeInt(array, 0, 0); + Assert.assertEquals("Zero Integer", 0, Server.readInt(array, 0)); + + Random rand = new Random(); + for (int i = 0; i < 128; i++) { + int n = rand.nextInt(); + if (rand.nextBoolean()) { + n = -n; + } + Server.writeInt(array, 0, n); + Assert.assertEquals("Random Integer", n, Server.readInt(array, 0)); + } + } + + @Test + public void testLong() + { + Server.writeLong(array, 0, Integer.MAX_VALUE); + Assert.assertEquals("Max Integer", Integer.MAX_VALUE, Server.readLong(array, 0)); + + Server.writeLong(array, 0, Integer.MIN_VALUE); + Assert.assertEquals("Min Integer", Integer.MIN_VALUE, Server.readLong(array, 0)); + + Server.writeLong(array, 0, 0); + Assert.assertEquals("Zero Integer", 0L, Server.readLong(array, 0)); + + Server.writeLong(array, 0, Long.MAX_VALUE); + Assert.assertEquals("Max Long", Long.MAX_VALUE, Server.readLong(array, 0)); + + Server.writeLong(array, 0, Long.MIN_VALUE); + Assert.assertEquals("Min Long", Long.MIN_VALUE, Server.readLong(array, 0)); + + Server.writeLong(array, 0, 0L); + Assert.assertEquals("Zero Long", 0L, Server.readLong(array, 0)); + + Random rand = new Random(); + for (int i = 0; i < 128; i++) { + long n = rand.nextLong(); + if (rand.nextBoolean()) { + n = -n; + } + Server.writeLong(array, 0, n); + Assert.assertEquals("Random Long", n, Server.readLong(array, 0)); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/flume/conf/flume-conf.properties ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/flume/conf/flume-conf.properties b/flume/src/test/resources/flume/conf/flume-conf.properties new file mode 100644 index 0000000..c892c53 --- /dev/null +++ b/flume/src/test/resources/flume/conf/flume-conf.properties @@ -0,0 +1,85 @@ +# +# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +#agent1 on node1 +agent1.channels = ch1 +agent1.sources = netcatSource +agent1.sinks = dt + +# channels +agent1.channels.ch1.type = file +agent1.channels.ch1.capacity = 10000000 +agent1.channels.ch1.transactionCapacity = 10000 +agent1.channels.ch1.maxFileSize = 67108864 + +agent1.sources.netcatSource.type = exec +agent1.sources.netcatSource.channels = ch1 +agent1.sources.netcatSource.command = src/test/bash/subcat_periodically src/test/resources/test_data/dt_spend 10000 1 +# Pick and Reorder the columns we need from a larger record for efficiency + agent1.sources.netcatSource.interceptors = columnchooser + agent1.sources.netcatSource.interceptors.columnchooser.type = com.datatorrent.flume.interceptor.ColumnFilteringInterceptor$Builder + agent1.sources.netcatSource.interceptors.columnchooser.srcSeparator = 2 + agent1.sources.netcatSource.interceptors.columnchooser.dstSeparator = 1 + agent1.sources.netcatSource.interceptors.columnchooser.columns = 0 43 62 69 68 139 190 70 71 52 75 37 39 42 191 138 + + agent2.sources.netcatSource.interceptors.columnchooser.type = com.datatorrent.flume.interceptor.ColumnFilteringFormattingInterceptor$Builder + agent2.sources.netcatSource.interceptors.columnchooser.srcSeparator = 2 + agent2.sources.netcatSource.interceptors.columnchooser.columnsFormatter = {0}\u0001{43}\u0001{62}\u0001{69}\u0001{68}\u0001{139}\u0001{190}\u0001{70}\u0001{71}\u0001{52}\u0001{75}\u0001{37}\u0001{39}\u0001{42}\u0001{191}\u0001{138}\u0001 + +# index -- description -- type if different +# 0 Slice guid; // long +# 43 public long time // yyyy-MM-dd HH:mm:ss +# 62 public long adv_id; +# 69 public int cmp_type; // string +# 68 public long cmp_id; +# 139 public long line_id; +# 190 public long bslice_id; +# 70 public long ao_id; +# 71 public long creative_id; +# 52 public long algo_id; +# 75 public int device_model_id; // string +# 37 public long impressions; +# 39 public long clicks; +# 42 public double spend; +# 191 public double bonus_spend; +# 138 public double spend_local; +# + +# first sink - dt +agent1.sinks.dt.id = CEVL00P +agent1.sinks.dt.type = com.datatorrent.flume.sink.DTFlumeSink +agent1.sinks.dt.hostname = localhost +agent1.sinks.dt.port = 8080 +agent1.sinks.dt.sleepMillis = 7 +agent1.sinks.dt.throughputAdjustmentFactor = 2 +agent1.sinks.dt.maximumEventsPerTransaction = 5000 +agent1.sinks.dt.minimumEventsPerTransaction = 1 + +# Ensure that we do not lose the data handed over to us by flume. + agent1.sinks.dt.storage = com.datatorrent.flume.storage.HDFSStorage + agent1.sinks.dt.storage.restore = false + agent1.sinks.dt.storage.baseDir = /tmp/flume101 + agent1.sinks.dt.channel = ch1 + +# Ensure that we are able to detect flume sinks (and failures) automatically. + agent1.sinks.dt.discovery = com.datatorrent.flume.discovery.ZKAssistedDiscovery + agent1.sinks.dt.discovery.connectionString = 127.0.0.1:2181 + agent1.sinks.dt.discovery.basePath = /HelloDT + agent1.sinks.dt.discovery.connectionTimeoutMillis = 1000 + agent1.sinks.dt.discovery.connectionRetryCount = 10 + agent1.sinks.dt.discovery.connectionRetrySleepMillis = 500 + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/flume/conf/flume-env.sh ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/flume/conf/flume-env.sh b/flume/src/test/resources/flume/conf/flume-env.sh new file mode 100644 index 0000000..c2232ea --- /dev/null +++ b/flume/src/test/resources/flume/conf/flume-env.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# +# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# This script runs on the machine which have maven repository populated under +# $HOME/.m2 If that's not the case, please adjust the JARPATH variable below +# to point to colon separated list of directories where jar files can be found +if test -z "$DT_FLUME_JAR" +then + echo [ERROR]: Environment variable DT_FLUME_JAR should point to a valid jar file which contains DTFlumeSink class >&2 + exit 2 +fi + +echo JARPATH is set to ${JARPATH:=$HOME/.m2/repository:.} +if test -z "$JAVA_HOME" +then + JAVA=java +else + JAVA=${JAVA_HOME}/bin/java +fi +FLUME_CLASSPATH=`JARPATH=$JARPATH $JAVA -cp $DT_FLUME_JAR com.datatorrent.jarpath.JarPath -N $DT_FLUME_JAR -Xdt-jarpath -Xdt-netlet` \ No newline at end of file http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/log4j.properties b/flume/src/test/resources/log4j.properties new file mode 100644 index 0000000..ac0a107 --- /dev/null +++ b/flume/src/test/resources/log4j.properties @@ -0,0 +1,38 @@ +# +# Copyright (c) 2016 DataTorrent, Inc. ALL Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=INFO,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n + +log4j.appender.malhar=org.apache.log4j.RollingFileAppender +log4j.appender.malhar.layout=org.apache.log4j.PatternLayout +log4j.appender.malhar.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +#log4j.appender.malhar.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +#log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121500 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121500 b/flume/src/test/resources/test_data/gentxns/2013121500 new file mode 100644 index 0000000..3ce5646 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121500 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121501 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121501 b/flume/src/test/resources/test_data/gentxns/2013121501 new file mode 100644 index 0000000..b2e70c0 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121501 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121502 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121502 b/flume/src/test/resources/test_data/gentxns/2013121502 new file mode 100644 index 0000000..ec13862 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121502 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121503 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121503 b/flume/src/test/resources/test_data/gentxns/2013121503 new file mode 100644 index 0000000..8267dd3 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121503 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121504 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121504 b/flume/src/test/resources/test_data/gentxns/2013121504 new file mode 100644 index 0000000..addfe62 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121504 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121505 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121505 b/flume/src/test/resources/test_data/gentxns/2013121505 new file mode 100644 index 0000000..d76aa9f Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121505 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121506 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121506 b/flume/src/test/resources/test_data/gentxns/2013121506 new file mode 100644 index 0000000..2f5bbb6 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121506 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121507 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121507 b/flume/src/test/resources/test_data/gentxns/2013121507 new file mode 100644 index 0000000..a022dad Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121507 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121508 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121508 b/flume/src/test/resources/test_data/gentxns/2013121508 new file mode 100644 index 0000000..d1e7f5c Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121508 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121509 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121509 b/flume/src/test/resources/test_data/gentxns/2013121509 new file mode 100644 index 0000000..10d61de Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121509 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121510 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121510 b/flume/src/test/resources/test_data/gentxns/2013121510 new file mode 100644 index 0000000..c2f76c8 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121510 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121511 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121511 b/flume/src/test/resources/test_data/gentxns/2013121511 new file mode 100644 index 0000000..bf16cfe Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121511 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121512 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121512 b/flume/src/test/resources/test_data/gentxns/2013121512 new file mode 100644 index 0000000..fe75419 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121512 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121513 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121513 b/flume/src/test/resources/test_data/gentxns/2013121513 new file mode 100644 index 0000000..3094cae Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121513 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121514 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121514 b/flume/src/test/resources/test_data/gentxns/2013121514 new file mode 100644 index 0000000..6e00e4a Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121514 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121515 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121515 b/flume/src/test/resources/test_data/gentxns/2013121515 new file mode 100644 index 0000000..b860e43 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121515 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121516 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121516 b/flume/src/test/resources/test_data/gentxns/2013121516 new file mode 100644 index 0000000..dfb5854 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121516 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121517 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121517 b/flume/src/test/resources/test_data/gentxns/2013121517 new file mode 100644 index 0000000..c8da2cc Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121517 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121518 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121518 b/flume/src/test/resources/test_data/gentxns/2013121518 new file mode 100644 index 0000000..2cb628b Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121518 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121519 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121519 b/flume/src/test/resources/test_data/gentxns/2013121519 new file mode 100644 index 0000000..6fab9d9 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121519 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121520 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121520 b/flume/src/test/resources/test_data/gentxns/2013121520 new file mode 100644 index 0000000..ba56d49 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121520 differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bbdab0e8/flume/src/test/resources/test_data/gentxns/2013121521 ---------------------------------------------------------------------- diff --git a/flume/src/test/resources/test_data/gentxns/2013121521 b/flume/src/test/resources/test_data/gentxns/2013121521 new file mode 100644 index 0000000..37de926 Binary files /dev/null and b/flume/src/test/resources/test_data/gentxns/2013121521 differ
