Updated Branches: refs/heads/flume-1.5 a7703d0b0 -> c6117b50d
FLUME-2265. Closed bucket writers should be removed from sfwriters map (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c6117b50 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c6117b50 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c6117b50 Branch: refs/heads/flume-1.5 Commit: c6117b50d998d8989890bd1dca911cdf402ba1a2 Parents: a7703d0 Author: Jarek Jarcec Cecho <[email protected]> Authored: Mon Dec 23 04:41:29 2013 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Mon Dec 23 04:43:08 2013 -0800 ---------------------------------------------------------------------- .../flume/sink/hdfs/BucketClosedException.java | 30 +++ .../apache/flume/sink/hdfs/BucketWriter.java | 73 ++++--- .../apache/flume/sink/hdfs/HDFSEventSink.java | 74 ++++--- .../flume/sink/hdfs/HDFSBadSeqWriter.java | 69 ------- .../flume/sink/hdfs/HDFSBadWriterFactory.java | 42 ---- .../flume/sink/hdfs/HDFSTestSeqWriter.java | 75 +++++++ .../flume/sink/hdfs/HDFSTestWriterFactory.java | 42 ++++ .../flume/sink/hdfs/TestBucketWriter.java | 47 ++++- .../flume/sink/hdfs/TestHDFSEventSink.java | 195 +++++++++++++++++-- 9 files changed, 473 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/c6117b50/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java new file mode 100644 index 0000000..1aca58f --- /dev/null +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketClosedException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.hdfs; + +import org.apache.flume.FlumeException; + +public class BucketClosedException extends FlumeException{ + + private static final long serialVersionUID = -4216667125119540357L; + + public BucketClosedException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/c6117b50/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 200d457..62e47de 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; import org.apache.flume.Clock; import org.apache.flume.Context; import org.apache.flume.Event; @@ -95,8 +96,8 @@ class BucketWriter { private SinkCounter sinkCounter; private final int idleTimeout; private volatile ScheduledFuture<Void> idleFuture; - private final WriterCallback onIdleCallback; - private final String onIdleCallbackPath; + private final WriterCallback onCloseCallback; + private final String onCloseCallbackPath; private final long callTimeout; private final ExecutorService callTimeoutPool; private final int maxConsecUnderReplRotations = 30; // make this config'able? @@ -105,15 +106,15 @@ class BucketWriter { // flag that the bucket writer was closed due to idling and thus shouldn't be // reopened. Not ideal, but avoids internals of owners - protected boolean idleClosed = false; + protected boolean closed = false; BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize, Context context, String filePath, String fileName, String inUsePrefix, String inUseSuffix, String fileSuffix, CompressionCodec codeC, CompressionType compType, HDFSWriter writer, ScheduledExecutorService timedRollerPool, UserGroupInformation user, - SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback, - String onIdleCallbackPath, long callTimeout, + SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback, + String onCloseCallbackPath, long callTimeout, ExecutorService callTimeoutPool) { this.rollInterval = rollInterval; this.rollSize = rollSize; @@ -131,8 +132,8 @@ class BucketWriter { this.user = user; this.sinkCounter = sinkCounter; this.idleTimeout = idleTimeout; - this.onIdleCallback = onIdleCallback; - this.onIdleCallbackPath = onIdleCallbackPath; + this.onCloseCallback = onCloseCallback; + this.onCloseCallbackPath = onCloseCallbackPath; this.callTimeout = callTimeout; this.callTimeoutPool = callTimeoutPool; fileExtensionCounter = new AtomicLong(clock.currentTimeMillis()); @@ -252,7 +253,8 @@ class BucketWriter { LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.", bucketPath, rollInterval); try { - close(); + // Roll the file and remove reference from sfWriters map. + close(true); } catch(Throwable t) { LOG.error("Unexpected error", t); } @@ -268,11 +270,24 @@ class BucketWriter { /** * Close the file handle and rename the temp file to the permanent filename. - * Safe to call multiple times. Logs HDFSWriter.close() exceptions. + * Safe to call multiple times. Logs HDFSWriter.close() exceptions. This + * method will not cause the bucket writer to be dereferenced from the HDFS + * sink that owns it. This method should be used only when size or count + * based rolling closes this file. * @throws IOException On failure to rename if temp file exists. * @throws InterruptedException */ public synchronized void close() throws IOException, InterruptedException { + close(false); + } + /** + * Close the file handle and rename the temp file to the permanent filename. + * Safe to call multiple times. Logs HDFSWriter.close() exceptions. + * @throws IOException On failure to rename if temp file exists. + * @throws InterruptedException + */ + public synchronized void close(boolean callCloseCallback) + throws IOException, InterruptedException { checkAndThrowInterruptedException(); flush(); LOG.debug("Closing {}", bucketPath); @@ -306,6 +321,10 @@ class BucketWriter { renameBucket(); // could block or throw IOException fileSystem = null; } + if (callCloseCallback) { + runCloseAction(); + } + closed = true; } /** @@ -324,16 +343,10 @@ class BucketWriter { if(idleFuture == null || idleFuture.cancel(false)) { Callable<Void> idleAction = new Callable<Void>() { public Void call() throws Exception { - try { - if(isOpen) { - LOG.info("Closing idle bucketWriter {}", bucketPath); - idleClosed = true; - close(); - } - if(onIdleCallback != null) - onIdleCallback.run(onIdleCallbackPath); - } catch(Throwable t) { - LOG.error("Unexpected error", t); + LOG.info("Closing idle bucketWriter {} at {}", bucketPath, + System.currentTimeMillis()); + if (isOpen) { + close(true); } return null; } @@ -345,6 +358,16 @@ class BucketWriter { } } + private void runCloseAction() { + try { + if(onCloseCallback != null) { + onCloseCallback.run(onCloseCallbackPath); + } + } catch(Throwable t) { + LOG.error("Unexpected error", t); + } + } + /** * doFlush() must only be called by flush() * @throws IOException @@ -396,10 +419,14 @@ class BucketWriter { } idleFuture = null; } + + // If the bucket writer was closed due to roll timeout or idle timeout, + // force a new bucket writer to be created. Roll count and roll size will + // just reuse this one if (!isOpen) { - if(idleClosed) { - throw new IOException("This bucket writer was closed due to idling and this handle " + - "is thus no longer valid"); + if (closed) { + throw new BucketClosedException("This bucket writer was closed and " + + "this handle is thus no longer valid"); } open(); } @@ -446,7 +473,7 @@ class BucketWriter { bucketPath + ") and rethrowing exception.", e.getMessage()); try { - close(); + close(true); } catch (IOException e2) { LOG.warn("Caught IOException while closing file (" + bucketPath + "). Exception follows.", e2); http://git-wip-us.apache.org/repos/asf/flume/blob/c6117b50/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index f0a6e4b..4ea78c1 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Calendar; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.TimeZone; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; @@ -139,6 +140,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private volatile int idleTimeout; private Clock clock; + private final Object sfWritersLock = new Object(); /* * Extended Java LinkedHashMap for open file handle LRU queue. @@ -182,6 +184,11 @@ public class HDFSEventSink extends AbstractSink implements Configurable { this.writerFactory = writerFactory; } + @VisibleForTesting + Map<String, BucketWriter> getSfWriters() { + return sfWriters; + } + // read configuration and setup thresholds @Override public void configure(Context context) { @@ -359,28 +366,29 @@ public class HDFSEventSink extends AbstractSink implements Configurable { timeZone, needRounding, roundUnit, roundValue, useLocalTime); String lookupPath = realPath + DIRECTORY_DELIMITER + realName; - BucketWriter bucketWriter = sfWriters.get(lookupPath); - - // we haven't seen this file yet, so open it and cache the handle - if (bucketWriter == null) { - HDFSWriter hdfsWriter = writerFactory.getWriter(fileType); - - WriterCallback idleCallback = null; - if(idleTimeout != 0) { - idleCallback = new WriterCallback() { - @Override - public void run(String bucketPath) { - sfWriters.remove(bucketPath); - } - }; + BucketWriter bucketWriter; + HDFSWriter hdfsWriter = null; + // Callback to remove the reference to the bucket writer from the + // sfWriters map so that all buffers used by the HDFS file + // handles are garbage collected. + WriterCallback closeCallback = new WriterCallback() { + @Override + public void run(String bucketPath) { + LOG.info("Writer callback called."); + synchronized (sfWritersLock) { + sfWriters.remove(bucketPath); + } + } + }; + synchronized (sfWritersLock) { + bucketWriter = sfWriters.get(lookupPath); + // we haven't seen this file yet, so open it and cache the handle + if (bucketWriter == null) { + hdfsWriter = writerFactory.getWriter(fileType); + bucketWriter = initializeBucketWriter(realPath, realName, + lookupPath, hdfsWriter, closeCallback); + sfWriters.put(lookupPath, bucketWriter); } - bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount, - batchSize, context, realPath, realName, inUsePrefix, inUseSuffix, - suffix, codeC, compType, hdfsWriter, timedRollerPool, - proxyTicket, sinkCounter, idleTimeout, idleCallback, - lookupPath, callTimeout, callTimeoutPool); - - sfWriters.put(lookupPath, bucketWriter); } // track the buckets getting written in this transaction @@ -389,7 +397,19 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } // Write the data to HDFS - bucketWriter.append(event); + try { + bucketWriter.append(event); + } catch (BucketClosedException ex) { + LOG.info("Bucket was closed while trying to append, " + + "reinitializing bucket and writing event."); + hdfsWriter = writerFactory.getWriter(fileType); + bucketWriter = initializeBucketWriter(realPath, realName, + lookupPath, hdfsWriter, closeCallback); + synchronized (sfWritersLock) { + sfWriters.put(lookupPath, bucketWriter); + } + bucketWriter.append(event); + } } if (txnEventCount == 0) { @@ -430,6 +450,16 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } } + private BucketWriter initializeBucketWriter(String realPath, + String realName, String lookupPath, HDFSWriter hdfsWriter, + WriterCallback closeCallback) { + return new BucketWriter(rollInterval, rollSize, rollCount, + batchSize, context, realPath, realName, inUsePrefix, inUseSuffix, + suffix, codeC, compType, hdfsWriter, timedRollerPool, + proxyTicket, sinkCounter, idleTimeout, closeCallback, + lookupPath, callTimeout, callTimeoutPool); + } + @Override public void stop() { // do not constrain close() calls with a timeout http://git-wip-us.apache.org/repos/asf/flume/blob/c6117b50/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java deleted file mode 100644 index 63ab5af..0000000 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.sink.hdfs; - -import java.io.IOException; - -import org.apache.flume.Event; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.compress.CompressionCodec; - -public class HDFSBadSeqWriter extends HDFSSequenceFile { - protected volatile boolean closed, opened; - - @Override - public void open(String filePath, CompressionCodec codeC, - CompressionType compType) throws IOException { - super.open(filePath, codeC, compType); - if(closed) { - opened = true; - } - } - - @Override - public void append(Event e) throws IOException { - - if (e.getHeaders().containsKey("fault")) { - throw new IOException("Injected fault"); - } else if (e.getHeaders().containsKey("fault-once")) { - e.getHeaders().remove("fault-once"); - throw new IOException("Injected fault"); - } else if (e.getHeaders().containsKey("fault-until-reopen")) { - if(!(closed && opened)) { - throw new IOException("Injected fault-until-reopen"); - } - } else if (e.getHeaders().containsKey("slow")) { - long waitTime = Long.parseLong(e.getHeaders().get("slow")); - try { - Thread.sleep(waitTime); - } catch (InterruptedException eT) { - throw new IOException("append interrupted", eT); - } - } - - super.append(e); - } - - @Override - public void close() throws IOException { - closed = true; - super.close(); - } -} http://git-wip-us.apache.org/repos/asf/flume/blob/c6117b50/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java deleted file mode 100644 index f5d0808..0000000 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadWriterFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.sink.hdfs; - -import java.io.IOException; - -import org.apache.flume.sink.hdfs.HDFSBadSeqWriter; -import org.apache.flume.sink.hdfs.HDFSBadDataStream; - -public class HDFSBadWriterFactory extends HDFSWriterFactory { - static final String BadSequenceFileType = "SequenceFile"; - static final String BadDataStreamType = "DataStream"; - static final String BadCompStreamType = "CompressedStream"; - - @Override - public HDFSWriter getWriter(String fileType) throws IOException { - if (fileType == BadSequenceFileType) { - return new HDFSBadSeqWriter(); - } else if (fileType == BadDataStreamType) { - return new HDFSBadDataStream(); - } else { - throw new IOException("File type " + fileType + " not supported"); - } - } -} http://git-wip-us.apache.org/repos/asf/flume/blob/c6117b50/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java new file mode 100644 index 0000000..9c1cd09 --- /dev/null +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.sink.hdfs; + +import java.io.IOException; + +import org.apache.flume.Event; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CompressionCodec; + +public class HDFSTestSeqWriter extends HDFSSequenceFile { + protected volatile boolean closed, opened; + + private int openCount = 0; + HDFSTestSeqWriter(int openCount) { + this.openCount = openCount; + } + + @Override + public void open(String filePath, CompressionCodec codeC, + CompressionType compType) throws IOException { + super.open(filePath, codeC, compType); + if(closed) { + opened = true; + } + } + + @Override + public void append(Event e) throws IOException { + + if (e.getHeaders().containsKey("fault")) { + throw new IOException("Injected fault"); + } else if (e.getHeaders().containsKey("fault-once")) { + e.getHeaders().remove("fault-once"); + throw new IOException("Injected fault"); + } else if (e.getHeaders().containsKey("fault-until-reopen")) { + // opening first time. + if(openCount == 1) { + throw new IOException("Injected fault-until-reopen"); + } + } else if (e.getHeaders().containsKey("slow")) { + long waitTime = Long.parseLong(e.getHeaders().get("slow")); + try { + Thread.sleep(waitTime); + } catch (InterruptedException eT) { + throw new IOException("append interrupted", eT); + } + } + + super.append(e); + } + + @Override + public void close() throws IOException { + closed = true; + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/c6117b50/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestWriterFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestWriterFactory.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestWriterFactory.java new file mode 100644 index 0000000..70bd9e6 --- /dev/null +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestWriterFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.sink.hdfs; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +public class HDFSTestWriterFactory extends HDFSWriterFactory { + static final String TestSequenceFileType = "SequenceFile"; + static final String BadDataStreamType = "DataStream"; + + // so we can get a handle to this one in our test. + AtomicInteger openCount = new AtomicInteger(0); + + @Override + public HDFSWriter getWriter(String fileType) throws IOException { + if (fileType == TestSequenceFileType) { + return new HDFSTestSeqWriter(openCount.incrementAndGet()); + } else if (fileType == BadDataStreamType) { + return new HDFSBadDataStream(); + } else { + throw new IOException("File type " + fileType + " not supported"); + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/c6117b50/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index f741e03..b7cc586 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flume.Clock; import org.apache.flume.Context; @@ -113,13 +114,19 @@ public class TestBucketWriter { public void testIntervalRoller() throws IOException, InterruptedException { final int ROLL_INTERVAL = 1; // seconds final int NUM_EVENTS = 10; + final AtomicBoolean calledBack = new AtomicBoolean(false); MockHDFSWriter hdfsWriter = new MockHDFSWriter(); BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, - "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, - hdfsWriter, timedRollerPool, null, - new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, null, null, 30000, Executors.newSingleThreadExecutor()); + "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, + hdfsWriter, timedRollerPool, null, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), + 0, new HDFSEventSink.WriterCallback() { + @Override + public void run(String filePath) { + calledBack.set(true); + } + }, null, 30000, Executors.newSingleThreadExecutor()); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); long startNanos = System.nanoTime(); @@ -130,6 +137,14 @@ public class TestBucketWriter { // sleep to force a roll... wait 2x interval just to be sure Thread.sleep(2 * ROLL_INTERVAL * 1000L); + Assert.assertTrue(bucketWriter.closed); + Assert.assertTrue(calledBack.get()); + + bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, + "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, + hdfsWriter, timedRollerPool, null, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), + 0, null, null, 30000, Executors.newSingleThreadExecutor()); // write one more event (to reopen a new file so we will roll again later) bucketWriter.append(e); @@ -348,4 +363,28 @@ public class TestBucketWriter { Assert.assertTrue("Incorrect in use suffix", hdfsWriter.getOpenedFilePath().contains(SUFFIX)); } + @Test + public void testCallbackOnClose() throws IOException, InterruptedException { + final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test + final String SUFFIX = "WELCOME_TO_THE_EREBOR"; + final AtomicBoolean callbackCalled = new AtomicBoolean(false); + + MockHDFSWriter hdfsWriter = new MockHDFSWriter(); + BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, + "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, + hdfsWriter, timedRollerPool, null, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, + new HDFSEventSink.WriterCallback() { + @Override + public void run(String filePath) { + callbackCalled.set(true); + } + }, "blah", 30000, Executors.newSingleThreadExecutor()); + + Event e = EventBuilder.withBody("foo", Charsets.UTF_8); + bucketWriter.append(e); + bucketWriter.close(true); + + Assert.assertTrue(callbackCalled.get()); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/c6117b50/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index 5b7cec9..4337ef4 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.InputStreamReader; import java.nio.ByteBuffer; import java.nio.charset.CharsetDecoder; -import java.util.Arrays; import java.util.Calendar; import java.util.List; import java.util.UUID; @@ -673,7 +672,7 @@ public class TestHDFSEventSink { int totalEvents = 0; int i = 1, j = 1; - HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory(); + HDFSTestWriterFactory badWriterFactory = new HDFSTestWriterFactory(); sink = new HDFSEventSink(badWriterFactory); // clear the test directory @@ -689,7 +688,7 @@ public class TestHDFSEventSink { context.put("hdfs.filePrefix", fileName); context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.batchSize", String.valueOf(batchSize)); - context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType); + context.put("hdfs.fileType", HDFSTestWriterFactory.TestSequenceFileType); Configurables.configure(sink, context); @@ -840,7 +839,7 @@ public class TestHDFSEventSink { * This relies on Transactional rollback semantics for durability and * the behavior of the BucketWriter class of close()ing upon IOException. */ - @Test + @Test public void testCloseReopen() throws InterruptedException, LifecycleException, EventDeliveryException, IOException { @@ -852,7 +851,7 @@ public class TestHDFSEventSink { String newPath = testPath + "/singleBucket"; int i = 1, j = 1; - HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory(); + HDFSTestWriterFactory badWriterFactory = new HDFSTestWriterFactory(); sink = new HDFSEventSink(badWriterFactory); // clear the test directory @@ -868,7 +867,7 @@ public class TestHDFSEventSink { context.put("hdfs.filePrefix", fileName); context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.batchSize", String.valueOf(batchSize)); - context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType); + context.put("hdfs.fileType", HDFSTestWriterFactory.TestSequenceFileType); Configurables.configure(sink, context); @@ -910,6 +909,174 @@ public class TestHDFSEventSink { verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); } + /** + * Test that the old bucket writer is closed at the end of rollInterval and + * a new one is used for the next set of events. + */ + @Test + public void testCloseReopenOnRollTime() throws InterruptedException, + LifecycleException, EventDeliveryException, IOException { + + LOG.debug("Starting..."); + final int numBatches = 4; + final String fileName = "FlumeData"; + final long batchSize = 2; + String newPath = testPath + "/singleBucket"; + int i = 1, j = 1; + + HDFSTestWriterFactory badWriterFactory = new HDFSTestWriterFactory(); + sink = new HDFSEventSink(badWriterFactory); + + // clear the test directory + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + Path dirPath = new Path(newPath); + fs.delete(dirPath, true); + fs.mkdirs(dirPath); + + Context context = new Context(); + + context.put("hdfs.path", newPath); + context.put("hdfs.filePrefix", fileName); + context.put("hdfs.rollCount", String.valueOf(0)); + context.put("hdfs.rollSize", String.valueOf(0)); + context.put("hdfs.rollInterval", String.valueOf(2)); + context.put("hdfs.batchSize", String.valueOf(batchSize)); + context.put("hdfs.fileType", HDFSTestWriterFactory.TestSequenceFileType); + + Configurables.configure(sink, context); + + MemoryChannel channel = new MemoryChannel(); + Configurables.configure(channel, new Context()); + + sink.setChannel(channel); + sink.start(); + + Calendar eventDate = Calendar.getInstance(); + List<String> bodies = Lists.newArrayList(); + // push the event batches into channel + for (i = 1; i < numBatches; i++) { + channel.getTransaction().begin(); + try { + for (j = 1; j <= batchSize; j++) { + Event event = new SimpleEvent(); + eventDate.clear(); + eventDate.set(2011, i, i, i, 0); // yy mm dd + event.getHeaders().put("timestamp", + String.valueOf(eventDate.getTimeInMillis())); + event.getHeaders().put("hostname", "Host" + i); + String body = "Test." + i + "." + j; + event.setBody(body.getBytes()); + bodies.add(body); + // inject fault + event.getHeaders().put("count-check", ""); + channel.put(event); + } + channel.getTransaction().commit(); + } finally { + channel.getTransaction().close(); + } + LOG.info("execute sink to process the events: " + sink.process()); + // Make sure the first file gets rolled due to rollTimeout. + if (i == 1) { + Thread.sleep(2001); + } + } + LOG.info("clear any events pending due to errors: " + sink.process()); + sink.stop(); + + Assert.assertTrue(badWriterFactory.openCount.get() >= 2); + LOG.info("Total number of bucket writers opened: {}", + badWriterFactory.openCount.get()); + verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, + bodies); + } + + /** + * Test that a close due to roll interval removes the bucketwriter from + * sfWriters map. + */ + @Test + public void testCloseRemovesFromSFWriters() throws InterruptedException, + LifecycleException, EventDeliveryException, IOException { + + LOG.debug("Starting..."); + final String fileName = "FlumeData"; + final long batchSize = 2; + String newPath = testPath + "/singleBucket"; + int i = 1, j = 1; + + HDFSTestWriterFactory badWriterFactory = new HDFSTestWriterFactory(); + sink = new HDFSEventSink(badWriterFactory); + + // clear the test directory + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + Path dirPath = new Path(newPath); + fs.delete(dirPath, true); + fs.mkdirs(dirPath); + + Context context = new Context(); + + context.put("hdfs.path", newPath); + context.put("hdfs.filePrefix", fileName); + context.put("hdfs.rollCount", String.valueOf(0)); + context.put("hdfs.rollSize", String.valueOf(0)); + context.put("hdfs.rollInterval", String.valueOf(1)); + context.put("hdfs.batchSize", String.valueOf(batchSize)); + context.put("hdfs.fileType", HDFSTestWriterFactory.TestSequenceFileType); + String expectedLookupPath = newPath + "/FlumeData"; + + Configurables.configure(sink, context); + + MemoryChannel channel = new MemoryChannel(); + Configurables.configure(channel, new Context()); + + sink.setChannel(channel); + sink.start(); + + Calendar eventDate = Calendar.getInstance(); + List<String> bodies = Lists.newArrayList(); + // push the event batches into channel + channel.getTransaction().begin(); + try { + for (j = 1; j <= 2 * batchSize; j++) { + Event event = new SimpleEvent(); + eventDate.clear(); + eventDate.set(2011, i, i, i, 0); // yy mm dd + event.getHeaders().put("timestamp", + String.valueOf(eventDate.getTimeInMillis())); + event.getHeaders().put("hostname", "Host" + i); + String body = "Test." + i + "." + j; + event.setBody(body.getBytes()); + bodies.add(body); + // inject fault + event.getHeaders().put("count-check", ""); + channel.put(event); + } + channel.getTransaction().commit(); + } finally { + channel.getTransaction().close(); + } + LOG.info("execute sink to process the events: " + sink.process()); + Assert.assertTrue(sink.getSfWriters().containsKey(expectedLookupPath)); + // Make sure the first file gets rolled due to rollTimeout. + Thread.sleep(2001); + Assert.assertFalse(sink.getSfWriters().containsKey(expectedLookupPath)); + LOG.info("execute sink to process the events: " + sink.process()); + // A new bucket writer should have been created for this bucket. So + // sfWriters map should not have the same key again. + Assert.assertTrue(sink.getSfWriters().containsKey(expectedLookupPath)); + sink.stop(); + + LOG.info("Total number of bucket writers opened: {}", + badWriterFactory.openCount.get()); + verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, + bodies); + } + + + /* * append using slow sink writer. * verify that the process returns backoff due to timeout @@ -934,7 +1101,7 @@ public class TestHDFSEventSink { fs.mkdirs(dirPath); // create HDFS sink with slow writer - HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory(); + HDFSTestWriterFactory badWriterFactory = new HDFSTestWriterFactory(); sink = new HDFSEventSink(badWriterFactory); Context context = new Context(); @@ -942,7 +1109,7 @@ public class TestHDFSEventSink { context.put("hdfs.filePrefix", fileName); context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.batchSize", String.valueOf(batchSize)); - context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType); + context.put("hdfs.fileType", HDFSTestWriterFactory.TestSequenceFileType); context.put("hdfs.callTimeout", Long.toString(1000)); Configurables.configure(sink, context); @@ -1004,7 +1171,7 @@ public class TestHDFSEventSink { fs.mkdirs(dirPath); // create HDFS sink with slow writer - HDFSBadWriterFactory badWriterFactory = new HDFSBadWriterFactory(); + HDFSTestWriterFactory badWriterFactory = new HDFSTestWriterFactory(); sink = new HDFSEventSink(badWriterFactory); Context context = new Context(); @@ -1012,7 +1179,7 @@ public class TestHDFSEventSink { context.put("hdfs.filePrefix", fileName); context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.batchSize", String.valueOf(batchSize)); - context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType); + context.put("hdfs.fileType", HDFSTestWriterFactory.TestSequenceFileType); context.put("hdfs.appendTimeout", String.valueOf(appendTimeout)); Configurables.configure(sink, context); @@ -1127,10 +1294,10 @@ public class TestHDFSEventSink { sink.process(); Thread.sleep(1001); // previous file should have timed out now - // this can throw an IOException(from the bucketWriter having idleClosed) - // this is not an issue as the sink will retry and get a fresh bucketWriter - // so long as the onIdleClose handler properly removes bucket writers that - // were closed due to idling + // this can throw BucketClosedException(from the bucketWriter having + // closed),this is not an issue as the sink will retry and get a fresh + // bucketWriter so long as the onClose handler properly removes + // bucket writers that were closed. sink.process(); sink.process(); Thread.sleep(500); // shouldn't be enough for a timeout to occur
