http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/sink/Server.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/sink/Server.java b/flume/src/main/java/com/datatorrent/flume/sink/Server.java deleted file mode 100644 index 03c1ff0..0000000 --- a/flume/src/main/java/com/datatorrent/flume/sink/Server.java +++ /dev/null @@ -1,420 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.sink; - -import java.net.InetSocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.Arrays; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.flume.discovery.Discovery; -import com.datatorrent.flume.discovery.Discovery.Service; -import com.datatorrent.netlet.AbstractLengthPrependerClient; -import com.datatorrent.netlet.AbstractServer; -import com.datatorrent.netlet.EventLoop; -import com.datatorrent.netlet.util.Slice; - -/** - * <p> - * Server class.</p> - * - * @since 0.9.2 - */ -public class Server extends AbstractServer -{ - private final String id; - private final Discovery<byte[]> discovery; - private final long acceptedTolerance; - - public Server(String id, Discovery<byte[]> discovery, long acceptedTolerance) - { - this.id = id; - this.discovery = discovery; - this.acceptedTolerance = acceptedTolerance; - } - - @Override - public void handleException(Exception cce, EventLoop el) - { - logger.error("Server Error", cce); - Request r = new Request(Command.SERVER_ERROR, null) - { - @Override - public Slice getAddress() - { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public int getEventCount() - { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public int getIdleCount() - { - throw new UnsupportedOperationException("Not supported yet."); - } - - }; - synchronized (requests) { - requests.add(r); - } - } - - private final Service<byte[]> service = new Service<byte[]>() - { - @Override - public String getHost() - { - return ((InetSocketAddress)getServerAddress()).getHostName(); - } - - @Override - public int getPort() - { - return ((InetSocketAddress)getServerAddress()).getPort(); - } - - @Override - public byte[] getPayload() - { - return null; - } - - @Override - public String getId() - { - return id; - } - - @Override - public String toString() - { - return "Server.Service{id=" + id + ", host=" + getHost() + ", port=" + getPort() + ", payload=" + - Arrays.toString(getPayload()) + '}'; - } - - }; - - @Override - public void unregistered(final SelectionKey key) - { - discovery.unadvertise(service); - super.unregistered(key); - } - - @Override - public void registered(final SelectionKey key) - { - super.registered(key); - discovery.advertise(service); - } - - public enum Command - { - ECHO((byte)0), - SEEK((byte)1), - COMMITTED((byte)2), - CHECKPOINTED((byte)3), - CONNECTED((byte)4), - DISCONNECTED((byte)5), - WINDOWED((byte)6), - SERVER_ERROR((byte)7); - - Command(byte b) - { - this.ord = b; - } - - public byte getOrdinal() - { - return ord; - } - - public static Command getCommand(byte b) - { - Command c; - switch (b) { - case 0: - c = ECHO; - break; - - case 1: - c = SEEK; - break; - - case 2: - c = COMMITTED; - break; - - case 3: - c = CHECKPOINTED; - break; - - case 4: - c = CONNECTED; - break; - - case 5: - c = DISCONNECTED; - break; - - case 6: - c = WINDOWED; - break; - - case 7: - c = SERVER_ERROR; - break; - - default: - throw new IllegalArgumentException(String.format("No Command defined for ordinal %b", b)); - } - - assert (b == c.ord); - return c; - } - - private final byte ord; - } - - public final ArrayList<Request> requests = new ArrayList<Request>(4); - - @Override - public ClientListener getClientConnection(SocketChannel sc, ServerSocketChannel ssc) - { - Client lClient = new Client(); - lClient.connected(); - return lClient; - } - - public class Client extends AbstractLengthPrependerClient - { - - @Override - public void onMessage(byte[] buffer, int offset, int size) - { - if (size != Request.FIXED_SIZE) { - logger.warn("Invalid Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size), - key.channel()); - return; - } - - long requestTime = Server.readLong(buffer, offset + Request.TIME_OFFSET); - if (System.currentTimeMillis() > (requestTime + acceptedTolerance)) { - logger.warn("Expired Request Received: {} from {}", Arrays.copyOfRange(buffer, offset, offset + size), - key.channel()); - return; - } - - try { - if (Command.getCommand(buffer[offset]) == Command.ECHO) { - write(buffer, offset, size); - return; - } - } catch (IllegalArgumentException ex) { - logger.warn("Invalid Request Received: {} from {}!", Arrays.copyOfRange(buffer, offset, offset + size), - key.channel(), ex); - return; - } - - Request r = Request.getRequest(buffer, offset, this); - synchronized (requests) { - requests.add(r); - } - } - - @Override - public void disconnected() - { - synchronized (requests) { - requests.add(Request.getRequest( - new byte[] {Command.DISCONNECTED.getOrdinal(), 0, 0, 0, 0, 0, 0, 0, 0}, 0, this)); - } - super.disconnected(); - } - - public boolean write(byte[] address, Slice event) - { - if (event.offset == 0 && event.length == event.buffer.length) { - return write(address, event.buffer); - } - - // a better method would be to replace the write implementation and allow it to natively support writing slices - return write(address, event.toByteArray()); - } - - } - - public abstract static class Request - { - public static final int FIXED_SIZE = 17; - public static final int TIME_OFFSET = 9; - public final Command type; - public final Client client; - - public Request(Command type, Client client) - { - this.type = type; - this.client = client; - } - - public abstract Slice getAddress(); - - public abstract int getEventCount(); - - public abstract int getIdleCount(); - - @Override - public String toString() - { - return "Request{" + "type=" + type + '}'; - } - - public static Request getRequest(final byte[] buffer, final int offset, Client client) - { - Command command = Command.getCommand(buffer[offset]); - switch (command) { - case WINDOWED: - return new Request(Command.WINDOWED, client) - { - final int eventCount; - final int idleCount; - - { - eventCount = Server.readInt(buffer, offset + 1); - idleCount = Server.readInt(buffer, offset + 5); - } - - @Override - public Slice getAddress() - { - throw new UnsupportedOperationException(); - } - - @Override - public int getEventCount() - { - return eventCount; - } - - @Override - public int getIdleCount() - { - return idleCount; - } - - @Override - public String toString() - { - return "Request{" + "type=" + type + ", eventCount=" + eventCount + ", idleCount=" + idleCount + '}'; - } - - }; - - default: - return new Request(command, client) - { - final Slice address; - - { - address = new Slice(buffer, offset + 1, 8); - } - - @Override - public Slice getAddress() - { - return address; - } - - @Override - public int getEventCount() - { - throw new UnsupportedOperationException(); - } - - @Override - public int getIdleCount() - { - throw new UnsupportedOperationException(); - } - - @Override - public String toString() - { - return "Request{" + "type=" + type + ", address=" + address + '}'; - } - - }; - - } - - } - - } - - public static int readInt(byte[] buffer, int offset) - { - return buffer[offset++] & 0xff - | (buffer[offset++] & 0xff) << 8 - | (buffer[offset++] & 0xff) << 16 - | (buffer[offset++] & 0xff) << 24; - } - - public static void writeInt(byte[] buffer, int offset, int i) - { - buffer[offset++] = (byte)i; - buffer[offset++] = (byte)(i >>> 8); - buffer[offset++] = (byte)(i >>> 16); - buffer[offset++] = (byte)(i >>> 24); - } - - public static long readLong(byte[] buffer, int offset) - { - return (long)buffer[offset++] & 0xff - | (long)(buffer[offset++] & 0xff) << 8 - | (long)(buffer[offset++] & 0xff) << 16 - | (long)(buffer[offset++] & 0xff) << 24 - | (long)(buffer[offset++] & 0xff) << 32 - | (long)(buffer[offset++] & 0xff) << 40 - | (long)(buffer[offset++] & 0xff) << 48 - | (long)(buffer[offset++] & 0xff) << 56; - } - - public static void writeLong(byte[] buffer, int offset, long l) - { - buffer[offset++] = (byte)l; - buffer[offset++] = (byte)(l >>> 8); - buffer[offset++] = (byte)(l >>> 16); - buffer[offset++] = (byte)(l >>> 24); - buffer[offset++] = (byte)(l >>> 32); - buffer[offset++] = (byte)(l >>> 40); - buffer[offset++] = (byte)(l >>> 48); - buffer[offset++] = (byte)(l >>> 56); - } - - private static final Logger logger = LoggerFactory.getLogger(Server.class); -}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java b/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java deleted file mode 100644 index 72e1913..0000000 --- a/flume/src/main/java/com/datatorrent/flume/source/HdfsTestSource.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.source; - -import java.io.BufferedReader; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Date; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; - -import javax.annotation.Nonnull; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.EventDrivenSource; -import org.apache.flume.channel.ChannelProcessor; -import org.apache.flume.conf.Configurable; -import org.apache.flume.event.EventBuilder; -import org.apache.flume.source.AbstractSource; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; - -/** - * <p>TestSource class.</p> - * - * @since 0.9.4 - */ -public class HdfsTestSource extends AbstractSource implements EventDrivenSource, Configurable -{ - public static final String SOURCE_DIR = "sourceDir"; - public static final String RATE = "rate"; - public static final String INIT_DATE = "initDate"; - - static byte FIELD_SEPARATOR = 2; - public Timer emitTimer; - @Nonnull - String directory; - Path directoryPath; - int rate; - String initDate; - long initTime; - List<String> dataFiles; - long oneDayBack; - - private transient BufferedReader br = null; - protected transient FileSystem fs; - private transient Configuration configuration; - - private transient int currentFile = 0; - private transient boolean finished; - private List<Event> events; - - public HdfsTestSource() - { - super(); - this.rate = 2500; - dataFiles = Lists.newArrayList(); - Calendar calendar = Calendar.getInstance(); - calendar.add(Calendar.DATE, -1); - oneDayBack = calendar.getTimeInMillis(); - configuration = new Configuration(); - events = Lists.newArrayList(); - } - - @Override - public void configure(Context context) - { - directory = context.getString(SOURCE_DIR); - rate = context.getInteger(RATE, rate); - initDate = context.getString(INIT_DATE); - - Preconditions.checkArgument(!Strings.isNullOrEmpty(directory)); - directoryPath = new Path(directory); - - String[] parts = initDate.split("-"); - Preconditions.checkArgument(parts.length == 3); - Calendar calendar = Calendar.getInstance(); - calendar.set(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]) - 1, Integer.parseInt(parts[2]), 0, 0, 0); - initTime = calendar.getTimeInMillis(); - - try { - List<String> files = findFiles(); - for (String file : files) { - dataFiles.add(file); - } - if (logger.isDebugEnabled()) { - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); - logger.debug("settings {} {} {} {} {}", directory, rate, dateFormat.format(oneDayBack), - dateFormat.format(new Date(initTime)), currentFile); - for (String file : dataFiles) { - logger.debug("settings add file {}", file); - } - } - - fs = FileSystem.newInstance(new Path(directory).toUri(), configuration); - Path filePath = new Path(dataFiles.get(currentFile)); - br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(fs.open(filePath)))); - } catch (IOException e) { - throw new RuntimeException(e); - } - - finished = true; - - } - - private List<String> findFiles() throws IOException - { - List<String> files = Lists.newArrayList(); - Path directoryPath = new Path(directory); - FileSystem lfs = FileSystem.newInstance(directoryPath.toUri(), configuration); - try { - logger.debug("checking for new files in {}", directoryPath); - RemoteIterator<LocatedFileStatus> statuses = lfs.listFiles(directoryPath, true); - for (; statuses.hasNext(); ) { - FileStatus status = statuses.next(); - Path path = status.getPath(); - String filePathStr = path.toString(); - if (!filePathStr.endsWith(".gz")) { - continue; - } - logger.debug("new file {}", filePathStr); - files.add(path.toString()); - } - } catch (FileNotFoundException e) { - logger.warn("Failed to list directory {}", directoryPath, e); - throw new RuntimeException(e); - } finally { - lfs.close(); - } - return files; - } - - @Override - public void start() - { - super.start(); - emitTimer = new Timer(); - - final ChannelProcessor channelProcessor = getChannelProcessor(); - emitTimer.scheduleAtFixedRate(new TimerTask() - { - @Override - public void run() - { - int lineCount = 0; - events.clear(); - try { - while (lineCount < rate && !finished) { - String line = br.readLine(); - - if (line == null) { - logger.debug("completed file {}", currentFile); - br.close(); - currentFile++; - if (currentFile == dataFiles.size()) { - logger.info("finished all files"); - finished = true; - break; - } - Path filePath = new Path(dataFiles.get(currentFile)); - br = new BufferedReader(new InputStreamReader(new GzipCompressorInputStream(fs.open(filePath)))); - logger.info("opening file {}. {}", currentFile, filePath); - continue; - } - lineCount++; - Event flumeEvent = EventBuilder.withBody(line.getBytes()); - events.add(flumeEvent); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - if (events.size() > 0) { - channelProcessor.processEventBatch(events); - } - if (finished) { - emitTimer.cancel(); - } - } - - }, 0, 1000); - } - - @Override - public void stop() - { - emitTimer.cancel(); - super.stop(); - } - - private static final Logger logger = LoggerFactory.getLogger(HdfsTestSource.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/source/TestSource.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/source/TestSource.java b/flume/src/main/java/com/datatorrent/flume/source/TestSource.java deleted file mode 100644 index 5773de3..0000000 --- a/flume/src/main/java/com/datatorrent/flume/source/TestSource.java +++ /dev/null @@ -1,250 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.source; - -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.HashMap; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; - -import javax.annotation.Nonnull; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.EventDrivenSource; -import org.apache.flume.channel.ChannelProcessor; -import org.apache.flume.conf.Configurable; -import org.apache.flume.event.EventBuilder; -import org.apache.flume.source.AbstractSource; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -/** - * <p>TestSource class.</p> - * - * @since 0.9.4 - */ -public class TestSource extends AbstractSource implements EventDrivenSource, Configurable -{ - public static final String SOURCE_FILE = "sourceFile"; - public static final String LINE_NUMBER = "lineNumber"; - public static final String RATE = "rate"; - public static final String PERCENT_PAST_EVENTS = "percentPastEvents"; - static byte FIELD_SEPARATOR = 1; - static int DEF_PERCENT_PAST_EVENTS = 5; - public Timer emitTimer; - @Nonnull - String filePath; - int rate; - int numberOfPastEvents; - transient List<Row> cache; - private transient int startIndex; - private transient Random random; - private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); - private SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - - public TestSource() - { - super(); - this.rate = 2500; - this.numberOfPastEvents = DEF_PERCENT_PAST_EVENTS * 25; - this.random = new Random(); - - } - - @Override - public void configure(Context context) - { - filePath = context.getString(SOURCE_FILE); - rate = context.getInteger(RATE, rate); - int percentPastEvents = context.getInteger(PERCENT_PAST_EVENTS, DEF_PERCENT_PAST_EVENTS); - Preconditions.checkArgument(!Strings.isNullOrEmpty(filePath)); - try { - BufferedReader lineReader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); - try { - buildCache(lineReader); - } finally { - lineReader.close(); - } - } catch (FileNotFoundException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - - if (DEF_PERCENT_PAST_EVENTS != percentPastEvents) { - numberOfPastEvents = (int)(percentPastEvents / 100.0 * cache.size()); - } - } - - @Override - public void start() - { - super.start(); - emitTimer = new Timer(); - - final ChannelProcessor channel = getChannelProcessor(); - final int cacheSize = cache.size(); - emitTimer.scheduleAtFixedRate(new TimerTask() - { - @Override - public void run() - { - int lastIndex = startIndex + rate; - if (lastIndex > cacheSize) { - lastIndex -= cacheSize; - processBatch(channel, cache.subList(startIndex, cacheSize)); - startIndex = 0; - while (lastIndex > cacheSize) { - processBatch(channel, cache); - lastIndex -= cacheSize; - } - processBatch(channel, cache.subList(0, lastIndex)); - } else { - processBatch(channel, cache.subList(startIndex, lastIndex)); - } - startIndex = lastIndex; - } - - }, 0, 1000); - } - - private void processBatch(ChannelProcessor channelProcessor, List<Row> rows) - { - if (rows.isEmpty()) { - return; - } - - int noise = random.nextInt(numberOfPastEvents + 1); - Set<Integer> pastIndices = Sets.newHashSet(); - for (int i = 0; i < noise; i++) { - pastIndices.add(random.nextInt(rows.size())); - } - - Calendar calendar = Calendar.getInstance(); - long high = calendar.getTimeInMillis(); - calendar.add(Calendar.DATE, -2); - long low = calendar.getTimeInMillis(); - - - - List<Event> events = Lists.newArrayList(); - for (int i = 0; i < rows.size(); i++) { - Row eventRow = rows.get(i); - if (pastIndices.contains(i)) { - long pastTime = (long)((Math.random() * (high - low)) + low); - byte[] pastDateField = dateFormat.format(pastTime).getBytes(); - byte[] pastTimeField = timeFormat.format(pastTime).getBytes(); - - System.arraycopy(pastDateField, 0, eventRow.bytes, eventRow.dateFieldStart, pastDateField.length); - System.arraycopy(pastTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, pastTimeField.length); - } else { - calendar.setTimeInMillis(System.currentTimeMillis()); - byte[] currentDateField = dateFormat.format(calendar.getTime()).getBytes(); - byte[] currentTimeField = timeFormat.format(calendar.getTime()).getBytes(); - - System.arraycopy(currentDateField, 0, eventRow.bytes, eventRow.dateFieldStart, currentDateField.length); - System.arraycopy(currentTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, currentTimeField.length); - } - - HashMap<String, String> headers = new HashMap<String, String>(2); - headers.put(SOURCE_FILE, filePath); - headers.put(LINE_NUMBER, String.valueOf(startIndex + i)); - events.add(EventBuilder.withBody(eventRow.bytes, headers)); - } - channelProcessor.processEventBatch(events); - } - - @Override - public void stop() - { - emitTimer.cancel(); - super.stop(); - } - - private void buildCache(BufferedReader lineReader) throws IOException - { - cache = Lists.newArrayListWithCapacity(rate); - - String line; - while ((line = lineReader.readLine()) != null) { - byte[] row = line.getBytes(); - Row eventRow = new Row(row); - final int rowsize = row.length; - - /* guid */ - int sliceLengh = -1; - while (++sliceLengh < rowsize) { - if (row[sliceLengh] == FIELD_SEPARATOR) { - break; - } - } - int recordStart = sliceLengh + 1; - int pointer = sliceLengh + 1; - while (pointer < rowsize) { - if (row[pointer++] == FIELD_SEPARATOR) { - eventRow.dateFieldStart = recordStart; - break; - } - } - - /* lets parse the date */ - int dateStart = pointer; - while (pointer < rowsize) { - if (row[pointer++] == FIELD_SEPARATOR) { - eventRow.timeFieldStart = dateStart; - break; - } - } - - cache.add(eventRow); - } - } - - private static class Row - { - final byte[] bytes; - int dateFieldStart; - int timeFieldStart; -// boolean past; - - Row(byte[] bytes) - { - this.bytes = bytes; - } - - } - - private static final Logger logger = LoggerFactory.getLogger(TestSource.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java b/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java deleted file mode 100644 index da94154..0000000 --- a/flume/src/main/java/com/datatorrent/flume/storage/DebugWrapper.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.storage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flume.Context; -import org.apache.flume.conf.Configurable; - -import com.datatorrent.api.Component; -import com.datatorrent.netlet.util.Slice; - -/** - * <p>DebugWrapper class.</p> - * - * @since 0.9.4 - */ -public class DebugWrapper implements Storage, Configurable, Component<com.datatorrent.api.Context> -{ - HDFSStorage storage = new HDFSStorage(); - - @Override - public byte[] store(Slice bytes) - { - byte[] ret = null; - - try { - ret = storage.store(bytes); - } finally { - logger.debug("storage.store(new byte[]{{}});", bytes); - } - - return ret; - } - - @Override - public byte[] retrieve(byte[] identifier) - { - byte[] ret = null; - - try { - ret = storage.retrieve(identifier); - } finally { - logger.debug("storage.retrieve(new byte[]{{}});", identifier); - } - - return ret; - } - - @Override - public byte[] retrieveNext() - { - byte[] ret = null; - try { - ret = storage.retrieveNext(); - } finally { - logger.debug("storage.retrieveNext();"); - } - - return ret; - } - - @Override - public void clean(byte[] identifier) - { - try { - storage.clean(identifier); - } finally { - logger.debug("storage.clean(new byte[]{{}});", identifier); - } - } - - @Override - public void flush() - { - try { - storage.flush(); - } finally { - logger.debug("storage.flush();"); - } - } - - @Override - public void configure(Context cntxt) - { - try { - storage.configure(cntxt); - } finally { - logger.debug("storage.configure({});", cntxt); - } - } - - @Override - public void setup(com.datatorrent.api.Context t1) - { - try { - storage.setup(t1); - } finally { - logger.debug("storage.setup({});", t1); - } - - } - - @Override - public void teardown() - { - try { - storage.teardown(); - } finally { - logger.debug("storage.teardown();"); - } - } - - private static final Logger logger = LoggerFactory.getLogger(DebugWrapper.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java b/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java deleted file mode 100644 index 76f663c..0000000 --- a/flume/src/main/java/com/datatorrent/flume/storage/ErrorMaskingEventCodec.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.storage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flume.Event; - -import com.datatorrent.netlet.util.Slice; - -/** - * <p>ErrorMaskingEventCodec class.</p> - * - * @since 1.0.4 - */ -public class ErrorMaskingEventCodec extends EventCodec -{ - - @Override - public Object fromByteArray(Slice fragment) - { - try { - return super.fromByteArray(fragment); - } catch (RuntimeException re) { - logger.warn("Cannot deserialize event {}", fragment, re); - } - - return null; - } - - @Override - public Slice toByteArray(Event event) - { - try { - return super.toByteArray(event); - } catch (RuntimeException re) { - logger.warn("Cannot serialize event {}", event, re); - } - - return null; - } - - - private static final Logger logger = LoggerFactory.getLogger(ErrorMaskingEventCodec.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java b/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java deleted file mode 100644 index 0ece548..0000000 --- a/flume/src/main/java/com/datatorrent/flume/storage/EventCodec.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.storage; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.util.HashMap; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flume.Event; -import org.apache.flume.event.EventBuilder; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -import com.datatorrent.api.StreamCodec; -import com.datatorrent.netlet.util.Slice; - -/** - * <p>EventCodec class.</p> - * - * @since 0.9.4 - */ -public class EventCodec implements StreamCodec<Event> -{ - private final transient Kryo kryo; - - public EventCodec() - { - this.kryo = new Kryo(); - this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); - } - - @Override - public Object fromByteArray(Slice fragment) - { - ByteArrayInputStream is = new ByteArrayInputStream(fragment.buffer, fragment.offset, fragment.length); - Input input = new Input(is); - - @SuppressWarnings("unchecked") - HashMap<String, String> headers = kryo.readObjectOrNull(input, HashMap.class); - byte[] body = kryo.readObjectOrNull(input, byte[].class); - return EventBuilder.withBody(body, headers); - } - - @Override - public Slice toByteArray(Event event) - { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - Output output = new Output(os); - - Map<String, String> headers = event.getHeaders(); - if (headers != null && headers.getClass() != HashMap.class) { - HashMap<String, String> tmp = new HashMap<String, String>(headers.size()); - tmp.putAll(headers); - headers = tmp; - } - kryo.writeObjectOrNull(output, headers, HashMap.class); - kryo.writeObjectOrNull(output, event.getBody(), byte[].class); - output.flush(); - final byte[] bytes = os.toByteArray(); - return new Slice(bytes, 0, bytes.length); - } - - @Override - public int getPartition(Event o) - { - return o.hashCode(); - } - - private static final Logger logger = LoggerFactory.getLogger(EventCodec.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java b/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java deleted file mode 100644 index 4dcddcd..0000000 --- a/flume/src/main/java/com/datatorrent/flume/storage/HDFSStorage.java +++ /dev/null @@ -1,947 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.storage; - -import java.io.DataInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import javax.validation.constraints.NotNull; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flume.Context; -import org.apache.flume.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import com.google.common.primitives.Ints; -import com.google.common.primitives.Longs; - -import com.datatorrent.api.Component; -import com.datatorrent.common.util.NameableThreadFactory; -import com.datatorrent.flume.sink.Server; -import com.datatorrent.netlet.util.Slice; - -/** - * HDFSStorage is developed to store and retrieve the data from HDFS - * <p /> - * The properties that can be set on HDFSStorage are: <br /> - * baseDir - The base directory where the data is going to be stored <br /> - * restore - This is used to restore the application from previous failure <br /> - * blockSize - The maximum size of the each file to created. <br /> - * - * @since 0.9.3 - */ -public class HDFSStorage implements Storage, Configurable, Component<com.datatorrent.api.Context> -{ - public static final int DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024; - public static final String BASE_DIR_KEY = "baseDir"; - public static final String RESTORE_KEY = "restore"; - public static final String BLOCKSIZE = "blockSize"; - public static final String BLOCK_SIZE_MULTIPLE = "blockSizeMultiple"; - public static final String NUMBER_RETRY = "retryCount"; - - private static final String OFFSET_SUFFIX = "-offsetFile"; - private static final String BOOK_KEEPING_FILE_OFFSET = "-bookKeepingOffsetFile"; - private static final String FLUSHED_IDENTITY_FILE = "flushedCounter"; - private static final String CLEAN_OFFSET_FILE = "cleanoffsetFile"; - private static final String FLUSHED_IDENTITY_FILE_TEMP = "flushedCounter.tmp"; - private static final String CLEAN_OFFSET_FILE_TEMP = "cleanoffsetFile.tmp"; - private static final int IDENTIFIER_SIZE = 8; - private static final int DATA_LENGTH_BYTE_SIZE = 4; - - /** - * Number of times the storage will try to get the filesystem - */ - private int retryCount = 3; - /** - * The multiple of block size - */ - private int blockSizeMultiple = 1; - /** - * Identifier for this storage. - */ - @NotNull - private String id; - /** - * The baseDir where the storage facility is going to create files. - */ - @NotNull - private String baseDir; - /** - * The block size to be used to create the storage files - */ - private long blockSize; - /** - * - */ - private boolean restore; - /** - * This identifies the current file number - */ - private long currentWrittenFile; - /** - * This identifies the file number that has been flushed - */ - private long flushedFileCounter; - /** - * The file that stores the fileCounter information - */ - // private Path fileCounterFile; - /** - * The file that stores the flushed fileCounter information - */ - private Path flushedCounterFile; - private Path flushedCounterFileTemp; - /** - * This identifies the last cleaned file number - */ - private long cleanedFileCounter; - /** - * The file that stores the clean file counter information - */ - // private Path cleanFileCounterFile; - /** - * The file that stores the clean file offset information - */ - private Path cleanFileOffsetFile; - private Path cleanFileOffsetFileTemp; - private FileSystem fs; - private FSDataOutputStream dataStream; - ArrayList<DataBlock> files2Commit = new ArrayList<DataBlock>(); - /** - * The offset in the current opened file - */ - private long fileWriteOffset; - private FSDataInputStream readStream; - private long retrievalOffset; - private long retrievalFile; - private int offset; - private long flushedLong; - private long flushedFileWriteOffset; - private long bookKeepingFileOffset; - private byte[] cleanedOffset = new byte[8]; - private long skipOffset; - private long skipFile; - private transient Path basePath; - private ExecutorService storageExecutor; - private byte[] currentData; - private FSDataInputStream nextReadStream; - private long nextFlushedLong; - private long nextRetrievalFile; - private byte[] nextRetrievalData; - - public HDFSStorage() - { - this.restore = true; - } - - /** - * This stores the Identifier information identified in the last store function call - * - * @param ctx - */ - @Override - public void configure(Context ctx) - { - String tempId = ctx.getString(ID); - if (tempId == null) { - if (id == null) { - throw new IllegalArgumentException("id can't be null."); - } - } else { - id = tempId; - } - - String tempBaseDir = ctx.getString(BASE_DIR_KEY); - if (tempBaseDir != null) { - baseDir = tempBaseDir; - } - - restore = ctx.getBoolean(RESTORE_KEY, restore); - Long tempBlockSize = ctx.getLong(BLOCKSIZE); - if (tempBlockSize != null) { - blockSize = tempBlockSize; - } - blockSizeMultiple = ctx.getInteger(BLOCK_SIZE_MULTIPLE, blockSizeMultiple); - retryCount = ctx.getInteger(NUMBER_RETRY,retryCount); - } - - /** - * This function reads the file at a location and return the bytes stored in the file " - * - * @param path - the location of the file - * @return - * @throws IOException - */ - byte[] readData(Path path) throws IOException - { - DataInputStream is = new DataInputStream(fs.open(path)); - byte[] bytes = new byte[is.available()]; - is.readFully(bytes); - is.close(); - return bytes; - } - - /** - * This function writes the bytes to a file specified by the path - * - * @param path the file location - * @param data the data to be written to the file - * @return - * @throws IOException - */ - private FSDataOutputStream writeData(Path path, byte[] data) throws IOException - { - FSDataOutputStream fsOutputStream; - if (fs.getScheme().equals("file")) { - // local FS does not support hflush and does not flush native stream - fsOutputStream = new FSDataOutputStream( - new FileOutputStream(Path.getPathWithoutSchemeAndAuthority(path).toString()), null); - } else { - fsOutputStream = fs.create(path); - } - fsOutputStream.write(data); - return fsOutputStream; - } - - private long calculateOffset(long fileOffset, long fileCounter) - { - return ((fileCounter << 32) | (fileOffset & 0xffffffffL)); - } - - @Override - public byte[] store(Slice slice) - { - // logger.debug("store message "); - int bytesToWrite = slice.length + DATA_LENGTH_BYTE_SIZE; - if (currentWrittenFile < skipFile) { - fileWriteOffset += bytesToWrite; - if (fileWriteOffset >= bookKeepingFileOffset) { - files2Commit.add(new DataBlock(null, bookKeepingFileOffset, - new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), currentWrittenFile)); - currentWrittenFile++; - if (fileWriteOffset > bookKeepingFileOffset) { - fileWriteOffset = bytesToWrite; - } else { - fileWriteOffset = 0; - } - try { - bookKeepingFileOffset = getFlushedFileWriteOffset( - new Path(basePath, currentWrittenFile + BOOK_KEEPING_FILE_OFFSET)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return null; - } - - if (flushedFileCounter == currentWrittenFile && dataStream == null) { - currentWrittenFile++; - fileWriteOffset = 0; - } - - if (flushedFileCounter == skipFile && skipFile != -1) { - skipFile++; - } - - if (fileWriteOffset + bytesToWrite < blockSize) { - try { - /* write length and the actual data to the file */ - if (fileWriteOffset == 0) { - // writeData(flushedCounterFile, String.valueOf(currentWrittenFile).getBytes()).close(); - dataStream = writeData(new Path(basePath, String.valueOf(currentWrittenFile)), - Ints.toByteArray(slice.length)); - dataStream.write(slice.buffer, slice.offset, slice.length); - } else { - dataStream.write(Ints.toByteArray(slice.length)); - dataStream.write(slice.buffer, slice.offset, slice.length); - } - fileWriteOffset += bytesToWrite; - - byte[] fileOffset = null; - if ((currentWrittenFile > skipFile) || (currentWrittenFile == skipFile && fileWriteOffset > skipOffset)) { - skipFile = -1; - fileOffset = new byte[IDENTIFIER_SIZE]; - Server.writeLong(fileOffset, 0, calculateOffset(fileWriteOffset, currentWrittenFile)); - } - return fileOffset; - } catch (IOException ex) { - logger.warn("Error while storing the bytes {}", ex.getMessage()); - closeFs(); - throw new RuntimeException(ex); - } - } - DataBlock db = new DataBlock(dataStream, fileWriteOffset, - new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), currentWrittenFile); - db.close(); - files2Commit.add(db); - fileWriteOffset = 0; - ++currentWrittenFile; - return store(slice); - } - - /** - * @param b - * @param startIndex - * @return - */ - long byteArrayToLong(byte[] b, int startIndex) - { - final byte b1 = 0; - return Longs.fromBytes(b1, b1, b1, b1, b[3 + startIndex], b[2 + startIndex], b[1 + startIndex], b[startIndex]); - } - - @Override - public byte[] retrieve(byte[] identifier) - { - skipFile = -1; - skipOffset = 0; - logger.debug("retrieve with address {}", Arrays.toString(identifier)); - // flushing the last incomplete flushed file - closeUnflushedFiles(); - - retrievalOffset = byteArrayToLong(identifier, 0); - retrievalFile = byteArrayToLong(identifier, offset); - - if (retrievalFile == 0 && retrievalOffset == 0 && currentWrittenFile == 0 && fileWriteOffset == 0) { - skipOffset = 0; - return null; - } - - // making sure that the deleted address is not requested again - if (retrievalFile != 0 || retrievalOffset != 0) { - long cleanedFile = byteArrayToLong(cleanedOffset, offset); - if (retrievalFile < cleanedFile || (retrievalFile == cleanedFile && - retrievalOffset < byteArrayToLong(cleanedOffset, 0))) { - logger.warn("The address asked has been deleted retrievalFile={}, cleanedFile={}, retrievalOffset={}, " + - "cleanedOffset={}", retrievalFile, cleanedFile, retrievalOffset, byteArrayToLong(cleanedOffset, 0)); - closeFs(); - throw new IllegalArgumentException(String.format("The data for address %s has already been deleted", - Arrays.toString(identifier))); - } - } - - // we have just started - if (retrievalFile == 0 && retrievalOffset == 0) { - retrievalFile = byteArrayToLong(cleanedOffset, offset); - retrievalOffset = byteArrayToLong(cleanedOffset, 0); - } - - if ((retrievalFile > flushedFileCounter)) { - skipFile = retrievalFile; - skipOffset = retrievalOffset; - retrievalFile = -1; - return null; - } - if ((retrievalFile == flushedFileCounter && retrievalOffset >= flushedFileWriteOffset)) { - skipFile = retrievalFile; - skipOffset = retrievalOffset - flushedFileWriteOffset; - retrievalFile = -1; - return null; - } - - try { - if (readStream != null) { - readStream.close(); - readStream = null; - } - Path path = new Path(basePath, String.valueOf(retrievalFile)); - if (!fs.exists(path)) { - retrievalFile = -1; - closeFs(); - throw new RuntimeException(String.format("File %s does not exist", path.toString())); - } - - byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); - flushedLong = Server.readLong(flushedOffset, 0); - while (retrievalOffset >= flushedLong && retrievalFile < flushedFileCounter) { - retrievalOffset -= flushedLong; - retrievalFile++; - flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); - flushedLong = Server.readLong(flushedOffset, 0); - } - - if (retrievalOffset >= flushedLong) { - logger.warn("data not flushed for the given identifier"); - retrievalFile = -1; - return null; - } - synchronized (HDFSStorage.this) { - if (nextReadStream != null) { - nextReadStream.close(); - nextReadStream = null; - } - } - currentData = null; - path = new Path(basePath, String.valueOf(retrievalFile)); - //readStream = new FSDataInputStream(fs.open(path)); - currentData = readData(path); - //readStream.seek(retrievalOffset); - storageExecutor.submit(getNextStream()); - return retrieveHelper(); - } catch (IOException e) { - closeFs(); - throw new RuntimeException(e); - } - } - - private byte[] retrieveHelper() throws IOException - { - int tempRetrievalOffset = (int)retrievalOffset; - int length = Ints.fromBytes(currentData[tempRetrievalOffset], currentData[tempRetrievalOffset + 1], - currentData[tempRetrievalOffset + 2], currentData[tempRetrievalOffset + 3]); - byte[] data = new byte[length + IDENTIFIER_SIZE]; - System.arraycopy(currentData, tempRetrievalOffset + 4, data, IDENTIFIER_SIZE, length); - retrievalOffset += length + DATA_LENGTH_BYTE_SIZE; - if (retrievalOffset >= flushedLong) { - Server.writeLong(data, 0, calculateOffset(0, retrievalFile + 1)); - } else { - Server.writeLong(data, 0, calculateOffset(retrievalOffset, retrievalFile)); - } - return data; - } - - @Override - public byte[] retrieveNext() - { - if (retrievalFile == -1) { - closeFs(); - throw new RuntimeException("Call retrieve first"); - } - - if (retrievalFile > flushedFileCounter) { - logger.warn("data is not flushed"); - return null; - } - - try { - if (currentData == null) { - synchronized (HDFSStorage.this) { - if (nextRetrievalData != null && (retrievalFile == nextRetrievalFile)) { - currentData = nextRetrievalData; - flushedLong = nextFlushedLong; - nextRetrievalData = null; - } else { - currentData = null; - currentData = readData(new Path(basePath, String.valueOf(retrievalFile))); - byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); - flushedLong = Server.readLong(flushedOffset, 0); - } - } - storageExecutor.submit(getNextStream()); - } - - if (retrievalOffset >= flushedLong) { - retrievalFile++; - retrievalOffset = 0; - - if (retrievalFile > flushedFileCounter) { - logger.warn("data is not flushed"); - return null; - } - - //readStream.close(); - // readStream = new FSDataInputStream(fs.open(new Path(basePath, String.valueOf(retrievalFile)))); - // byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); - // flushedLong = Server.readLong(flushedOffset, 0); - - synchronized (HDFSStorage.this) { - if (nextRetrievalData != null && (retrievalFile == nextRetrievalFile)) { - currentData = nextRetrievalData; - flushedLong = nextFlushedLong; - nextRetrievalData = null; - } else { - currentData = null; - currentData = readData(new Path(basePath, String.valueOf(retrievalFile))); - byte[] flushedOffset = readData(new Path(basePath, retrievalFile + OFFSET_SUFFIX)); - flushedLong = Server.readLong(flushedOffset, 0); - } - } - storageExecutor.submit(getNextStream()); - } - //readStream.seek(retrievalOffset); - return retrieveHelper(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - @SuppressWarnings("AssignmentToCollectionOrArrayFieldFromParameter") - public void clean(byte[] identifier) - { - logger.info("clean {}", Arrays.toString(identifier)); - long cleanFileIndex = byteArrayToLong(identifier, offset); - - long cleanFileOffset = byteArrayToLong(identifier, 0); - if (flushedFileCounter == -1) { - identifier = new byte[8]; - } else if (cleanFileIndex > flushedFileCounter || - (cleanFileIndex == flushedFileCounter && cleanFileOffset >= flushedFileWriteOffset)) { - // This is to make sure that we clean only the data that is flushed - cleanFileIndex = flushedFileCounter; - cleanFileOffset = flushedFileWriteOffset; - Server.writeLong(identifier, 0, calculateOffset(cleanFileOffset, cleanFileIndex)); - } - cleanedOffset = identifier; - - try { - writeData(cleanFileOffsetFileTemp, identifier).close(); - fs.rename(cleanFileOffsetFileTemp, cleanFileOffsetFile); - if (cleanedFileCounter >= cleanFileIndex) { - return; - } - do { - Path path = new Path(basePath, String.valueOf(cleanedFileCounter)); - if (fs.exists(path) && fs.isFile(path)) { - fs.delete(path, false); - } - path = new Path(basePath, cleanedFileCounter + OFFSET_SUFFIX); - if (fs.exists(path) && fs.isFile(path)) { - fs.delete(path, false); - } - path = new Path(basePath, cleanedFileCounter + BOOK_KEEPING_FILE_OFFSET); - if (fs.exists(path) && fs.isFile(path)) { - fs.delete(path, false); - } - logger.info("deleted file {}", cleanedFileCounter); - ++cleanedFileCounter; - } while (cleanedFileCounter < cleanFileIndex); - // writeData(cleanFileCounterFile, String.valueOf(cleanedFileCounter).getBytes()).close(); - - } catch (IOException e) { - logger.warn("not able to close the streams {}", e.getMessage()); - closeFs(); - throw new RuntimeException(e); - } - } - - /** - * This is used mainly for cleaning up of counter files created - */ - void cleanHelperFiles() - { - try { - fs.delete(basePath, true); - } catch (IOException e) { - logger.warn(e.getMessage()); - } - } - - private void closeUnflushedFiles() - { - try { - files2Commit.clear(); - // closing the stream - if (dataStream != null) { - dataStream.close(); - dataStream = null; - // currentWrittenFile++; - // fileWriteOffset = 0; - } - - if (!fs.exists(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX))) { - fs.delete(new Path(basePath, String.valueOf(currentWrittenFile)), false); - } - - if (fs.exists(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX))) { - // This means that flush was called - flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX)); - bookKeepingFileOffset = getFlushedFileWriteOffset( - new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET)); - } - - if (flushedFileCounter != -1) { - currentWrittenFile = flushedFileCounter; - fileWriteOffset = flushedFileWriteOffset; - } else { - currentWrittenFile = 0; - fileWriteOffset = 0; - } - - flushedLong = 0; - - } catch (IOException e) { - closeFs(); - throw new RuntimeException(e); - } - } - - @Override - public void flush() - { - nextReadStream = null; - StringBuilder builder = new StringBuilder(); - Iterator<DataBlock> itr = files2Commit.iterator(); - DataBlock db; - try { - while (itr.hasNext()) { - db = itr.next(); - db.updateOffsets(); - builder.append(db.fileName).append(", "); - } - files2Commit.clear(); - - if (dataStream != null) { - dataStream.hflush(); - writeData(flushedCounterFileTemp, String.valueOf(currentWrittenFile).getBytes()).close(); - fs.rename(flushedCounterFileTemp, flushedCounterFile); - updateFlushedOffset(new Path(basePath, currentWrittenFile + OFFSET_SUFFIX), fileWriteOffset); - flushedFileWriteOffset = fileWriteOffset; - builder.append(currentWrittenFile); - } - logger.debug("flushed files {}", builder.toString()); - } catch (IOException ex) { - logger.warn("not able to close the stream {}", ex.getMessage()); - closeFs(); - throw new RuntimeException(ex); - } - flushedFileCounter = currentWrittenFile; - // logger.debug("flushedFileCounter in flush {}",flushedFileCounter); - } - - /** - * This updates the flushed offset - */ - private void updateFlushedOffset(Path file, long bytesWritten) - { - byte[] lastStoredOffset = new byte[IDENTIFIER_SIZE]; - Server.writeLong(lastStoredOffset, 0, bytesWritten); - try { - writeData(file, lastStoredOffset).close(); - } catch (IOException e) { - try { - if (!Arrays.equals(readData(file), lastStoredOffset)) { - closeFs(); - throw new RuntimeException(e); - } - } catch (Exception e1) { - closeFs(); - throw new RuntimeException(e1); - } - } - } - - public int getBlockSizeMultiple() - { - return blockSizeMultiple; - } - - public void setBlockSizeMultiple(int blockSizeMultiple) - { - this.blockSizeMultiple = blockSizeMultiple; - } - - /** - * @return the baseDir - */ - public String getBaseDir() - { - return baseDir; - } - - /** - * @param baseDir the baseDir to set - */ - public void setBaseDir(String baseDir) - { - this.baseDir = baseDir; - } - - /** - * @return the id - */ - public String getId() - { - return id; - } - - /** - * @param id the id to set - */ - public void setId(String id) - { - this.id = id; - } - - /** - * @return the blockSize - */ - public long getBlockSize() - { - return blockSize; - } - - /** - * @param blockSize the blockSize to set - */ - public void setBlockSize(long blockSize) - { - this.blockSize = blockSize; - } - - /** - * @return the restore - */ - public boolean isRestore() - { - return restore; - } - - /** - * @param restore the restore to set - */ - public void setRestore(boolean restore) - { - this.restore = restore; - } - - class DataBlock - { - FSDataOutputStream dataStream; - long dataOffset; - Path path2FlushedData; - long fileName; - private Path bookKeepingPath; - - DataBlock(FSDataOutputStream stream, long bytesWritten, Path path2FlushedData, long fileName) - { - this.dataStream = stream; - this.dataOffset = bytesWritten; - this.path2FlushedData = path2FlushedData; - this.fileName = fileName; - } - - public void close() - { - if (dataStream != null) { - try { - dataStream.close(); - bookKeepingPath = new Path(basePath, fileName + BOOK_KEEPING_FILE_OFFSET); - updateFlushedOffset(bookKeepingPath, dataOffset); - } catch (IOException ex) { - logger.warn("not able to close the stream {}", ex.getMessage()); - closeFs(); - throw new RuntimeException(ex); - } - } - } - - public void updateOffsets() throws IOException - { - updateFlushedOffset(path2FlushedData, dataOffset); - if (bookKeepingPath != null && fs.exists(bookKeepingPath)) { - fs.delete(bookKeepingPath, false); - } - } - - } - - private static final Logger logger = LoggerFactory.getLogger(HDFSStorage.class); - - @Override - public void setup(com.datatorrent.api.Context context) - { - Configuration conf = new Configuration(); - if (baseDir == null) { - baseDir = conf.get("hadoop.tmp.dir"); - if (baseDir == null || baseDir.isEmpty()) { - throw new IllegalArgumentException("baseDir cannot be null."); - } - } - offset = 4; - skipOffset = -1; - skipFile = -1; - int tempRetryCount = 0; - while (tempRetryCount < retryCount && fs == null) { - try { - fs = FileSystem.newInstance(conf); - tempRetryCount++; - } catch (Throwable throwable) { - logger.warn("Not able to get file system ", throwable); - } - } - - try { - Path path = new Path(baseDir); - basePath = new Path(path, id); - if (fs == null) { - fs = FileSystem.newInstance(conf); - } - if (!fs.exists(path)) { - closeFs(); - throw new RuntimeException(String.format("baseDir passed (%s) doesn't exist.", baseDir)); - } - if (!fs.isDirectory(path)) { - closeFs(); - throw new RuntimeException(String.format("baseDir passed (%s) is not a directory.", baseDir)); - } - if (!restore) { - fs.delete(basePath, true); - } - if (!fs.exists(basePath) || !fs.isDirectory(basePath)) { - fs.mkdirs(basePath); - } - - if (blockSize == 0) { - blockSize = fs.getDefaultBlockSize(new Path(basePath, "tempData")); - } - if (blockSize == 0) { - blockSize = DEFAULT_BLOCK_SIZE; - } - - blockSize = blockSizeMultiple * blockSize; - - currentWrittenFile = 0; - cleanedFileCounter = -1; - retrievalFile = -1; - // fileCounterFile = new Path(basePath, IDENTITY_FILE); - flushedFileCounter = -1; - // cleanFileCounterFile = new Path(basePath, CLEAN_FILE); - cleanFileOffsetFile = new Path(basePath, CLEAN_OFFSET_FILE); - cleanFileOffsetFileTemp = new Path(basePath, CLEAN_OFFSET_FILE_TEMP); - flushedCounterFile = new Path(basePath, FLUSHED_IDENTITY_FILE); - flushedCounterFileTemp = new Path(basePath, FLUSHED_IDENTITY_FILE_TEMP); - - if (restore) { - // - // if (fs.exists(fileCounterFile) && fs.isFile(fileCounterFile)) { - // //currentWrittenFile = Long.valueOf(new String(readData(fileCounterFile))); - // } - - if (fs.exists(cleanFileOffsetFile) && fs.isFile(cleanFileOffsetFile)) { - cleanedOffset = readData(cleanFileOffsetFile); - } - - if (fs.exists(flushedCounterFile) && fs.isFile(flushedCounterFile)) { - String strFlushedFileCounter = new String(readData(flushedCounterFile)); - if (strFlushedFileCounter.isEmpty()) { - logger.warn("empty flushed file"); - } else { - flushedFileCounter = Long.valueOf(strFlushedFileCounter); - flushedFileWriteOffset = getFlushedFileWriteOffset(new Path(basePath, flushedFileCounter + OFFSET_SUFFIX)); - bookKeepingFileOffset = getFlushedFileWriteOffset( - new Path(basePath, flushedFileCounter + BOOK_KEEPING_FILE_OFFSET)); - } - - } - } - fileWriteOffset = flushedFileWriteOffset; - currentWrittenFile = flushedFileCounter; - cleanedFileCounter = byteArrayToLong(cleanedOffset, offset) - 1; - if (currentWrittenFile == -1) { - ++currentWrittenFile; - fileWriteOffset = 0; - } - - } catch (IOException io) { - - throw new RuntimeException(io); - } - storageExecutor = Executors.newSingleThreadExecutor(new NameableThreadFactory("StorageHelper")); - } - - private void closeFs() - { - if (fs != null) { - try { - fs.close(); - fs = null; - } catch (IOException e) { - logger.debug(e.getMessage()); - } - } - } - - private long getFlushedFileWriteOffset(Path filePath) throws IOException - { - if (flushedFileCounter != -1 && fs.exists(filePath)) { - byte[] flushedFileOffsetByte = readData(filePath); - if (flushedFileOffsetByte != null && flushedFileOffsetByte.length == 8) { - return Server.readLong(flushedFileOffsetByte, 0); - } - } - return 0; - } - - @Override - public void teardown() - { - logger.debug("called teardown"); - try { - if (readStream != null) { - readStream.close(); - } - synchronized (HDFSStorage.this) { - if (nextReadStream != null) { - nextReadStream.close(); - } - } - - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - closeUnflushedFiles(); - storageExecutor.shutdown(); - } - - } - - private Runnable getNextStream() - { - return new Runnable() - { - @Override - public void run() - { - try { - synchronized (HDFSStorage.this) { - nextRetrievalFile = retrievalFile + 1; - if (nextRetrievalFile > flushedFileCounter) { - nextRetrievalData = null; - return; - } - Path path = new Path(basePath, String.valueOf(nextRetrievalFile)); - Path offsetPath = new Path(basePath, nextRetrievalFile + OFFSET_SUFFIX); - nextRetrievalData = null; - nextRetrievalData = readData(path); - byte[] flushedOffset = readData(offsetPath); - nextFlushedLong = Server.readLong(flushedOffset, 0); - } - } catch (Throwable e) { - logger.warn("in storage executor ", e); - - } - } - }; - } - -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/com/datatorrent/flume/storage/Storage.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/com/datatorrent/flume/storage/Storage.java b/flume/src/main/java/com/datatorrent/flume/storage/Storage.java deleted file mode 100644 index 5130f3c..0000000 --- a/flume/src/main/java/com/datatorrent/flume/storage/Storage.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.flume.storage; - -import com.datatorrent.netlet.util.Slice; - -/** - * <p>Storage interface.</p> - * - * @since 0.9.2 - */ -public interface Storage -{ - /** - * key in the context for Unique identifier for the storage which may be used to recover from failure. - */ - String ID = "id"; - - /** - * This stores the bytes and returns the unique identifier to retrieve these bytes - * - * @param bytes - * @return - */ - byte[] store(Slice bytes); - - /** - * This returns the data bytes for the current identifier and the identifier for next data bytes. <br/> - * The first eight bytes contain the identifier and the remaining bytes contain the data - * - * @param identifier - * @return - */ - byte[] retrieve(byte[] identifier); - - /** - * This returns data bytes and the identifier for the next data bytes. The identifier for current data bytes is based - * on the retrieve method call and number of retrieveNext method calls after retrieve method call. <br/> - * The first eight bytes contain the identifier and the remaining bytes contain the data - * - * @return - */ - byte[] retrieveNext(); - - /** - * This is used to clean up the files identified by identifier - * - * @param identifier - */ - void clean(byte[] identifier); - - /** - * This flushes the data from stream - * - */ - void flush(); - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java new file mode 100644 index 0000000..619a625 --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/Discovery.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.discovery; + +import java.util.Collection; + +/** + * When DTFlumeSink server instance binds to the network interface, it can publish + * its whereabouts by invoking advertise method on the Discovery object. Similarly + * when it ceases accepting any more connections, it can publish its intent to do + * so by invoking unadvertise.<p /> + * Interesting parties can call discover method to get the list of addresses where + * they can find an available DTFlumeSink server instance. + * + * @param <T> - Type of the objects which can be discovered + * @since 0.9.3 + */ +public interface Discovery<T> +{ + /** + * Recall the previously published address as it's no longer valid. + * + * @param service + */ + void unadvertise(Service<T> service); + + /** + * Advertise the host/port address where DTFlumeSink is accepting a client connection. + * + * @param service + */ + void advertise(Service<T> service); + + /** + * Discover all the addresses which are actively accepting the client connections. + * + * @return - Active server addresses which can accept the connections. + */ + Collection<Service<T>> discover(); + + interface Service<T> + { + String getHost(); + + int getPort(); + + T getPayload(); + + String getId(); + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d200737b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java ---------------------------------------------------------------------- diff --git a/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java new file mode 100644 index 0000000..9a7dd3c --- /dev/null +++ b/flume/src/main/java/org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.java @@ -0,0 +1,430 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.flume.discovery; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; + +import javax.validation.constraints.NotNull; + +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectReader; +import org.codehaus.jackson.map.ObjectWriter; +import org.codehaus.jackson.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.curator.utils.EnsurePath; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.details.InstanceSerializer; +import org.apache.flume.conf.Configurable; + +import com.google.common.base.Throwables; + +import com.datatorrent.api.Component; + +/** + * <p>ZKAssistedDiscovery class.</p> + * + * @since 0.9.3 + */ +public class ZKAssistedDiscovery implements Discovery<byte[]>, + Component<com.datatorrent.api.Context>, Configurable, Serializable +{ + @NotNull + private String serviceName; + @NotNull + private String connectionString; + @NotNull + private String basePath; + private int connectionTimeoutMillis; + private int connectionRetryCount; + private int conntectionRetrySleepMillis; + private transient InstanceSerializerFactory instanceSerializerFactory; + private transient CuratorFramework curatorFramework; + private transient ServiceDiscovery<byte[]> discovery; + + public ZKAssistedDiscovery() + { + this.serviceName = "DTFlume"; + this.conntectionRetrySleepMillis = 500; + this.connectionRetryCount = 10; + this.connectionTimeoutMillis = 1000; + } + + @Override + public void unadvertise(Service<byte[]> service) + { + doAdvertise(service, false); + } + + @Override + public void advertise(Service<byte[]> service) + { + doAdvertise(service, true); + } + + public void doAdvertise(Service<byte[]> service, boolean flag) + { + try { + new EnsurePath(basePath).ensure(curatorFramework.getZookeeperClient()); + + ServiceInstance<byte[]> instance = getInstance(service); + if (flag) { + discovery.registerService(instance); + } else { + discovery.unregisterService(instance); + } + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public Collection<Service<byte[]>> discover() + { + try { + new EnsurePath(basePath).ensure(curatorFramework.getZookeeperClient()); + + Collection<ServiceInstance<byte[]>> services = discovery.queryForInstances(serviceName); + ArrayList<Service<byte[]>> returnable = new ArrayList<Service<byte[]>>(services.size()); + for (final ServiceInstance<byte[]> service : services) { + returnable.add(new Service<byte[]>() + { + @Override + public String getHost() + { + return service.getAddress(); + } + + @Override + public int getPort() + { + return service.getPort(); + } + + @Override + public byte[] getPayload() + { + return service.getPayload(); + } + + @Override + public String getId() + { + return service.getId(); + } + + @Override + public String toString() + { + return "{" + getId() + " => " + getHost() + ':' + getPort() + '}'; + } + + }); + } + return returnable; + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public String toString() + { + return "ZKAssistedDiscovery{" + "serviceName=" + serviceName + ", connectionString=" + connectionString + + ", basePath=" + basePath + ", connectionTimeoutMillis=" + connectionTimeoutMillis + ", connectionRetryCount=" + + connectionRetryCount + ", conntectionRetrySleepMillis=" + conntectionRetrySleepMillis + '}'; + } + + @Override + public int hashCode() + { + int hash = 7; + hash = 47 * hash + this.serviceName.hashCode(); + hash = 47 * hash + this.connectionString.hashCode(); + hash = 47 * hash + this.basePath.hashCode(); + hash = 47 * hash + this.connectionTimeoutMillis; + hash = 47 * hash + this.connectionRetryCount; + hash = 47 * hash + this.conntectionRetrySleepMillis; + return hash; + } + + @Override + public boolean equals(Object obj) + { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final ZKAssistedDiscovery other = (ZKAssistedDiscovery)obj; + if (!this.serviceName.equals(other.serviceName)) { + return false; + } + if (!this.connectionString.equals(other.connectionString)) { + return false; + } + if (!this.basePath.equals(other.basePath)) { + return false; + } + if (this.connectionTimeoutMillis != other.connectionTimeoutMillis) { + return false; + } + if (this.connectionRetryCount != other.connectionRetryCount) { + return false; + } + if (this.conntectionRetrySleepMillis != other.conntectionRetrySleepMillis) { + return false; + } + return true; + } + + ServiceInstance<byte[]> getInstance(Service<byte[]> service) throws Exception + { + return ServiceInstance.<byte[]>builder() + .name(serviceName) + .address(service.getHost()) + .port(service.getPort()) + .id(service.getId()) + .payload(service.getPayload()) + .build(); + } + + private ServiceDiscovery<byte[]> getDiscovery(CuratorFramework curatorFramework) + { + return ServiceDiscoveryBuilder.builder(byte[].class) + .basePath(basePath) + .client(curatorFramework) + .serializer(instanceSerializerFactory.getInstanceSerializer( + new TypeReference<ServiceInstance<byte[]>>() + {})).build(); + } + + /** + * @return the instanceSerializerFactory + */ + InstanceSerializerFactory getInstanceSerializerFactory() + { + return instanceSerializerFactory; + } + + /** + * @return the connectionString + */ + public String getConnectionString() + { + return connectionString; + } + + /** + * @param connectionString the connectionString to set + */ + public void setConnectionString(String connectionString) + { + this.connectionString = connectionString; + } + + /** + * @return the basePath + */ + public String getBasePath() + { + return basePath; + } + + /** + * @param basePath the basePath to set + */ + public void setBasePath(String basePath) + { + this.basePath = basePath; + } + + /** + * @return the connectionTimeoutMillis + */ + public int getConnectionTimeoutMillis() + { + return connectionTimeoutMillis; + } + + /** + * @param connectionTimeoutMillis the connectionTimeoutMillis to set + */ + public void setConnectionTimeoutMillis(int connectionTimeoutMillis) + { + this.connectionTimeoutMillis = connectionTimeoutMillis; + } + + /** + * @return the connectionRetryCount + */ + public int getConnectionRetryCount() + { + return connectionRetryCount; + } + + /** + * @param connectionRetryCount the connectionRetryCount to set + */ + public void setConnectionRetryCount(int connectionRetryCount) + { + this.connectionRetryCount = connectionRetryCount; + } + + /** + * @return the conntectionRetrySleepMillis + */ + public int getConntectionRetrySleepMillis() + { + return conntectionRetrySleepMillis; + } + + /** + * @param conntectionRetrySleepMillis the conntectionRetrySleepMillis to set + */ + public void setConntectionRetrySleepMillis(int conntectionRetrySleepMillis) + { + this.conntectionRetrySleepMillis = conntectionRetrySleepMillis; + } + + /** + * @return the serviceName + */ + public String getServiceName() + { + return serviceName; + } + + /** + * @param serviceName the serviceName to set + */ + public void setServiceName(String serviceName) + { + this.serviceName = serviceName; + } + + @Override + public void configure(org.apache.flume.Context context) + { + serviceName = context.getString("serviceName", "DTFlume"); + connectionString = context.getString("connectionString"); + basePath = context.getString("basePath"); + + connectionTimeoutMillis = context.getInteger("connectionTimeoutMillis", 1000); + connectionRetryCount = context.getInteger("connectionRetryCount", 10); + conntectionRetrySleepMillis = context.getInteger("connectionRetrySleepMillis", 500); + } + + @Override + public void setup(com.datatorrent.api.Context context) + { + ObjectMapper om = new ObjectMapper(); + instanceSerializerFactory = new InstanceSerializerFactory(om.reader(), om.writer()); + + curatorFramework = CuratorFrameworkFactory.builder() + .connectionTimeoutMs(connectionTimeoutMillis) + .retryPolicy(new RetryNTimes(connectionRetryCount, conntectionRetrySleepMillis)) + .connectString(connectionString) + .build(); + curatorFramework.start(); + + discovery = getDiscovery(curatorFramework); + try { + discovery.start(); + } catch (Exception ex) { + Throwables.propagate(ex); + } + } + + @Override + public void teardown() + { + try { + discovery.close(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } finally { + curatorFramework.close(); + curatorFramework = null; + } + } + + public class InstanceSerializerFactory + { + private final ObjectReader objectReader; + private final ObjectWriter objectWriter; + + InstanceSerializerFactory(ObjectReader objectReader, ObjectWriter objectWriter) + { + this.objectReader = objectReader; + this.objectWriter = objectWriter; + } + + public <T> InstanceSerializer<T> getInstanceSerializer( + TypeReference<ServiceInstance<T>> typeReference) + { + return new JacksonInstanceSerializer<T>(objectReader, objectWriter, typeReference); + } + + final class JacksonInstanceSerializer<T> implements InstanceSerializer<T> + { + private final TypeReference<ServiceInstance<T>> typeRef; + private final ObjectWriter objectWriter; + private final ObjectReader objectReader; + + JacksonInstanceSerializer(ObjectReader objectReader, ObjectWriter objectWriter, + TypeReference<ServiceInstance<T>> typeRef) + { + this.objectReader = objectReader; + this.objectWriter = objectWriter; + this.typeRef = typeRef; + } + + @Override + public ServiceInstance<T> deserialize(byte[] bytes) throws Exception + { + return objectReader.withType(typeRef).readValue(bytes); + } + + @Override + public byte[] serialize(ServiceInstance<T> serviceInstance) throws Exception + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + objectWriter.writeValue(out, serviceInstance); + return out.toByteArray(); + } + + } + + } + + private static final long serialVersionUID = 201401221145L; + private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscovery.class); +}
