Updated Branches: refs/heads/master ff56d0539 -> 655df3c45
http://git-wip-us.apache.org/repos/asf/crunch/blob/655df3c4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java new file mode 100644 index 0000000..daa4a48 --- /dev/null +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java @@ -0,0 +1,70 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +package org.apache.crunch.io.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.crunch.Pair; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; + +import java.io.IOException; +import java.util.Iterator; + +class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> { + private static final Log LOG = LogFactory.getLog(HTableIterator.class); + + private final HTable table; + private final ResultScanner scanner; + private final Iterator<Result> iter; + + public HTableIterator(HTable table, ResultScanner scanner) { + this.table = table; + this.scanner = scanner; + this.iter = scanner.iterator(); + } + + @Override + public boolean hasNext() { + boolean hasNext = iter.hasNext(); + if (!hasNext) { + scanner.close(); + try { + table.close(); + } catch (IOException e) { + LOG.error("Exception closing HTable: " + table.getTableName(), e); + } + } + return hasNext; + } + + @Override + public Pair<ImmutableBytesWritable, Result> next() { + Result next = iter.next(); + return Pair.of(new ImmutableBytesWritable(next.getRow()), next); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +}
