http://git-wip-us.apache.org/repos/asf/hbase/blob/052a6f07/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java new file mode 100644 index 0000000..5cc2c7b --- /dev/null +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java @@ -0,0 +1,168 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.rest; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.UriInfo; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.rest.model.CellModel; +import org.apache.hadoop.hbase.rest.model.RowModel; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonProperty; + [email protected] +public class TableScanResource extends ResourceBase { + + private static final Log LOG = LogFactory.getLog(TableScanResource.class); + TableResource tableResource; + ResultScanner results; + int userRequestedLimit; + + public TableScanResource(ResultScanner scanner, int userRequestedLimit) throws IOException { + super(); + this.results = scanner; + this.userRequestedLimit = userRequestedLimit; + } + + @GET + @Produces({ Constants.MIMETYPE_XML, Constants.MIMETYPE_JSON }) + public CellSetModelStream get(final @Context UriInfo uriInfo) { + servlet.getMetrics().incrementRequests(1); + final int rowsToSend = userRequestedLimit; + servlet.getMetrics().incrementSucessfulScanRequests(1); + final Iterator<Result> itr = results.iterator(); + return new CellSetModelStream(new ArrayList<RowModel>() { + public Iterator<RowModel> iterator() { + return new Iterator<RowModel>() { + int count = rowsToSend; + + @Override + public boolean hasNext() { + if (count > 0) { + return itr.hasNext(); + } else { + return false; + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException( + "Remove method cannot be used in CellSetModelStream"); + } + + @Override + public RowModel next() { + Result rs = itr.next(); + if ((rs == null) || (count <= 0)) { + return null; + } + byte[] rowKey = rs.getRow(); + RowModel rModel = new RowModel(rowKey); + List<Cell> kvs = rs.listCells(); + for (Cell kv : kvs) { + rModel.addCell(new CellModel(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), + kv.getTimestamp(), CellUtil.cloneValue(kv))); + } + count--; + return rModel; + } + }; + } + }); + } + + @GET + @Produces({ Constants.MIMETYPE_PROTOBUF, Constants.MIMETYPE_PROTOBUF_IETF }) + public Response getProtobuf( + final @Context UriInfo uriInfo, + final @PathParam("scanspec") String scanSpec, + final @HeaderParam("Accept") String contentType, + @DefaultValue(Integer.MAX_VALUE + "") @QueryParam(Constants.SCAN_LIMIT) int userRequestedLimit, + @DefaultValue("") @QueryParam(Constants.SCAN_START_ROW) String startRow, + @DefaultValue("") @QueryParam(Constants.SCAN_END_ROW) String endRow, + @DefaultValue("column") @QueryParam(Constants.SCAN_COLUMN) List<String> column, + @DefaultValue("1") @QueryParam(Constants.SCAN_MAX_VERSIONS) int maxVersions, + @DefaultValue("-1") @QueryParam(Constants.SCAN_BATCH_SIZE) int batchSize, + @DefaultValue("0") @QueryParam(Constants.SCAN_START_TIME) long startTime, + @DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime, + @DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) { + servlet.getMetrics().incrementRequests(1); + try { + int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10); + ProtobufStreamingUtil stream = new ProtobufStreamingUtil(this.results, contentType, + userRequestedLimit, fetchSize); + servlet.getMetrics().incrementSucessfulScanRequests(1); + ResponseBuilder response = Response.ok(stream); + response.header("content-type", contentType); + return response.build(); + } catch (Exception exp) { + servlet.getMetrics().incrementFailedScanRequests(1); + processException(exp); + LOG.warn(exp); + return null; + } + } + + @XmlRootElement(name = "CellSet") + @XmlAccessorType(XmlAccessType.FIELD) + public static class CellSetModelStream { + // JAXB needs an arraylist for streaming + @XmlElement(name = "Row") + @JsonIgnore + private ArrayList<RowModel> Row; + + public CellSetModelStream() { + } + + public CellSetModelStream(final ArrayList<RowModel> rowList) { + this.Row = rowList; + } + + // jackson needs an iterator for streaming + @JsonProperty("Row") + public Iterator<RowModel> getIterator() { + return Row.iterator(); + } + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/052a6f07/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/VersionResource.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/VersionResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/VersionResource.java new file mode 100644 index 0000000..ae93825 --- /dev/null +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/VersionResource.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rest; + +import java.io.IOException; + +import javax.servlet.ServletContext; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.CacheControl; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; +import javax.ws.rs.core.Response.ResponseBuilder; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.rest.model.VersionModel; + +/** + * Implements REST software version reporting + * <p> + * <tt>/version/rest</tt> + * <p> + * <tt>/version</tt> (alias for <tt>/version/rest</tt>) + */ [email protected] +public class VersionResource extends ResourceBase { + + private static final Log LOG = LogFactory.getLog(VersionResource.class); + + static CacheControl cacheControl; + static { + cacheControl = new CacheControl(); + cacheControl.setNoCache(true); + cacheControl.setNoTransform(false); + } + + /** + * Constructor + * @throws IOException + */ + public VersionResource() throws IOException { + super(); + } + + /** + * Build a response for a version request. + * @param context servlet context + * @param uriInfo (JAX-RS context variable) request URL + * @return a response for a version request + */ + @GET + @Produces({MIMETYPE_TEXT, MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF, + MIMETYPE_PROTOBUF_IETF}) + public Response get(final @Context ServletContext context, + final @Context UriInfo uriInfo) { + if (LOG.isDebugEnabled()) { + LOG.debug("GET " + uriInfo.getAbsolutePath()); + } + servlet.getMetrics().incrementRequests(1); + ResponseBuilder response = Response.ok(new VersionModel(context)); + response.cacheControl(cacheControl); + servlet.getMetrics().incrementSucessfulGetRequests(1); + return response.build(); + } + + /** + * Dispatch to StorageClusterVersionResource + */ + @Path("cluster") + public StorageClusterVersionResource getClusterVersionResource() + throws IOException { + return new StorageClusterVersionResource(); + } + + /** + * Dispatch <tt>/version/rest</tt> to self. + */ + @Path("rest") + public VersionResource getVersionResource() { + return this; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/052a6f07/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java new file mode 100644 index 0000000..ebedf57 --- /dev/null +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java @@ -0,0 +1,525 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rest.client; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.httpclient.Header; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpMethod; +import org.apache.commons.httpclient.HttpVersion; +import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; +import org.apache.commons.httpclient.URI; +import org.apache.commons.httpclient.methods.ByteArrayRequestEntity; +import org.apache.commons.httpclient.methods.DeleteMethod; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.methods.HeadMethod; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.PutMethod; +import org.apache.commons.httpclient.params.HttpClientParams; +import org.apache.commons.httpclient.params.HttpConnectionManagerParams; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * A wrapper around HttpClient which provides some useful function and + * semantics for interacting with the REST gateway. + */ [email protected] [email protected] +public class Client { + public static final Header[] EMPTY_HEADER_ARRAY = new Header[0]; + + private static final Log LOG = LogFactory.getLog(Client.class); + + private HttpClient httpClient; + private Cluster cluster; + private boolean sslEnabled; + + private Map<String, String> extraHeaders; + + /** + * Default Constructor + */ + public Client() { + this(null); + } + + private void initialize(Cluster cluster, boolean sslEnabled) { + this.cluster = cluster; + this.sslEnabled = sslEnabled; + MultiThreadedHttpConnectionManager manager = + new MultiThreadedHttpConnectionManager(); + HttpConnectionManagerParams managerParams = manager.getParams(); + managerParams.setConnectionTimeout(2000); // 2 s + managerParams.setDefaultMaxConnectionsPerHost(10); + managerParams.setMaxTotalConnections(100); + extraHeaders = new ConcurrentHashMap<String, String>(); + this.httpClient = new HttpClient(manager); + HttpClientParams clientParams = httpClient.getParams(); + clientParams.setVersion(HttpVersion.HTTP_1_1); + + } + /** + * Constructor + * @param cluster the cluster definition + */ + public Client(Cluster cluster) { + initialize(cluster, false); + } + + /** + * Constructor + * @param cluster the cluster definition + * @param sslEnabled enable SSL or not + */ + public Client(Cluster cluster, boolean sslEnabled) { + initialize(cluster, sslEnabled); + } + + /** + * Shut down the client. Close any open persistent connections. + */ + public void shutdown() { + MultiThreadedHttpConnectionManager manager = + (MultiThreadedHttpConnectionManager) httpClient.getHttpConnectionManager(); + manager.shutdown(); + } + + /** + * @return the wrapped HttpClient + */ + public HttpClient getHttpClient() { + return httpClient; + } + + /** + * Add extra headers. These extra headers will be applied to all http + * methods before they are removed. If any header is not used any more, + * client needs to remove it explicitly. + */ + public void addExtraHeader(final String name, final String value) { + extraHeaders.put(name, value); + } + + /** + * Get an extra header value. + */ + public String getExtraHeader(final String name) { + return extraHeaders.get(name); + } + + /** + * Get all extra headers (read-only). + */ + public Map<String, String> getExtraHeaders() { + return Collections.unmodifiableMap(extraHeaders); + } + + /** + * Remove an extra header. + */ + public void removeExtraHeader(final String name) { + extraHeaders.remove(name); + } + + /** + * Execute a transaction method given only the path. Will select at random + * one of the members of the supplied cluster definition and iterate through + * the list until a transaction can be successfully completed. The + * definition of success here is a complete HTTP transaction, irrespective + * of result code. + * @param cluster the cluster definition + * @param method the transaction method + * @param headers HTTP header values to send + * @param path the properly urlencoded path + * @return the HTTP response code + * @throws IOException + */ + public int executePathOnly(Cluster cluster, HttpMethod method, + Header[] headers, String path) throws IOException { + IOException lastException; + if (cluster.nodes.size() < 1) { + throw new IOException("Cluster is empty"); + } + int start = (int)Math.round((cluster.nodes.size() - 1) * Math.random()); + int i = start; + do { + cluster.lastHost = cluster.nodes.get(i); + try { + StringBuilder sb = new StringBuilder(); + if (sslEnabled) { + sb.append("https://"); + } else { + sb.append("http://"); + } + sb.append(cluster.lastHost); + sb.append(path); + URI uri = new URI(sb.toString(), true); + return executeURI(method, headers, uri.toString()); + } catch (IOException e) { + lastException = e; + } + } while (++i != start && i < cluster.nodes.size()); + throw lastException; + } + + /** + * Execute a transaction method given a complete URI. + * @param method the transaction method + * @param headers HTTP header values to send + * @param uri a properly urlencoded URI + * @return the HTTP response code + * @throws IOException + */ + public int executeURI(HttpMethod method, Header[] headers, String uri) + throws IOException { + method.setURI(new URI(uri, true)); + for (Map.Entry<String, String> e: extraHeaders.entrySet()) { + method.addRequestHeader(e.getKey(), e.getValue()); + } + if (headers != null) { + for (Header header: headers) { + method.addRequestHeader(header); + } + } + long startTime = System.currentTimeMillis(); + int code = httpClient.executeMethod(method); + long endTime = System.currentTimeMillis(); + if (LOG.isDebugEnabled()) { + LOG.debug(method.getName() + " " + uri + " " + code + " " + + method.getStatusText() + " in " + (endTime - startTime) + " ms"); + } + return code; + } + + /** + * Execute a transaction method. Will call either <tt>executePathOnly</tt> + * or <tt>executeURI</tt> depending on whether a path only is supplied in + * 'path', or if a complete URI is passed instead, respectively. + * @param cluster the cluster definition + * @param method the HTTP method + * @param headers HTTP header values to send + * @param path the properly urlencoded path or URI + * @return the HTTP response code + * @throws IOException + */ + public int execute(Cluster cluster, HttpMethod method, Header[] headers, + String path) throws IOException { + if (path.startsWith("/")) { + return executePathOnly(cluster, method, headers, path); + } + return executeURI(method, headers, path); + } + + /** + * @return the cluster definition + */ + public Cluster getCluster() { + return cluster; + } + + /** + * @param cluster the cluster definition + */ + public void setCluster(Cluster cluster) { + this.cluster = cluster; + } + + /** + * Send a HEAD request + * @param path the path or URI + * @return a Response object with response detail + * @throws IOException + */ + public Response head(String path) throws IOException { + return head(cluster, path, null); + } + + /** + * Send a HEAD request + * @param cluster the cluster definition + * @param path the path or URI + * @param headers the HTTP headers to include in the request + * @return a Response object with response detail + * @throws IOException + */ + public Response head(Cluster cluster, String path, Header[] headers) + throws IOException { + HeadMethod method = new HeadMethod(); + try { + int code = execute(cluster, method, null, path); + headers = method.getResponseHeaders(); + return new Response(code, headers, null); + } finally { + method.releaseConnection(); + } + } + + /** + * Send a GET request + * @param path the path or URI + * @return a Response object with response detail + * @throws IOException + */ + public Response get(String path) throws IOException { + return get(cluster, path); + } + + /** + * Send a GET request + * @param cluster the cluster definition + * @param path the path or URI + * @return a Response object with response detail + * @throws IOException + */ + public Response get(Cluster cluster, String path) throws IOException { + return get(cluster, path, EMPTY_HEADER_ARRAY); + } + + /** + * Send a GET request + * @param path the path or URI + * @param accept Accept header value + * @return a Response object with response detail + * @throws IOException + */ + public Response get(String path, String accept) throws IOException { + return get(cluster, path, accept); + } + + /** + * Send a GET request + * @param cluster the cluster definition + * @param path the path or URI + * @param accept Accept header value + * @return a Response object with response detail + * @throws IOException + */ + public Response get(Cluster cluster, String path, String accept) + throws IOException { + Header[] headers = new Header[1]; + headers[0] = new Header("Accept", accept); + return get(cluster, path, headers); + } + + /** + * Send a GET request + * @param path the path or URI + * @param headers the HTTP headers to include in the request, + * <tt>Accept</tt> must be supplied + * @return a Response object with response detail + * @throws IOException + */ + public Response get(String path, Header[] headers) throws IOException { + return get(cluster, path, headers); + } + + /** + * Send a GET request + * @param c the cluster definition + * @param path the path or URI + * @param headers the HTTP headers to include in the request + * @return a Response object with response detail + * @throws IOException + */ + public Response get(Cluster c, String path, Header[] headers) + throws IOException { + GetMethod method = new GetMethod(); + try { + int code = execute(c, method, headers, path); + headers = method.getResponseHeaders(); + byte[] body = method.getResponseBody(); + InputStream in = method.getResponseBodyAsStream(); + return new Response(code, headers, body, in); + } finally { + method.releaseConnection(); + } + } + + /** + * Send a PUT request + * @param path the path or URI + * @param contentType the content MIME type + * @param content the content bytes + * @return a Response object with response detail + * @throws IOException + */ + public Response put(String path, String contentType, byte[] content) + throws IOException { + return put(cluster, path, contentType, content); + } + + /** + * Send a PUT request + * @param cluster the cluster definition + * @param path the path or URI + * @param contentType the content MIME type + * @param content the content bytes + * @return a Response object with response detail + * @throws IOException + */ + public Response put(Cluster cluster, String path, String contentType, + byte[] content) throws IOException { + Header[] headers = new Header[1]; + headers[0] = new Header("Content-Type", contentType); + return put(cluster, path, headers, content); + } + + /** + * Send a PUT request + * @param path the path or URI + * @param headers the HTTP headers to include, <tt>Content-Type</tt> must be + * supplied + * @param content the content bytes + * @return a Response object with response detail + * @throws IOException + */ + public Response put(String path, Header[] headers, byte[] content) + throws IOException { + return put(cluster, path, headers, content); + } + + /** + * Send a PUT request + * @param cluster the cluster definition + * @param path the path or URI + * @param headers the HTTP headers to include, <tt>Content-Type</tt> must be + * supplied + * @param content the content bytes + * @return a Response object with response detail + * @throws IOException + */ + public Response put(Cluster cluster, String path, Header[] headers, + byte[] content) throws IOException { + PutMethod method = new PutMethod(); + try { + method.setRequestEntity(new ByteArrayRequestEntity(content)); + int code = execute(cluster, method, headers, path); + headers = method.getResponseHeaders(); + content = method.getResponseBody(); + return new Response(code, headers, content); + } finally { + method.releaseConnection(); + } + } + + /** + * Send a POST request + * @param path the path or URI + * @param contentType the content MIME type + * @param content the content bytes + * @return a Response object with response detail + * @throws IOException + */ + public Response post(String path, String contentType, byte[] content) + throws IOException { + return post(cluster, path, contentType, content); + } + + /** + * Send a POST request + * @param cluster the cluster definition + * @param path the path or URI + * @param contentType the content MIME type + * @param content the content bytes + * @return a Response object with response detail + * @throws IOException + */ + public Response post(Cluster cluster, String path, String contentType, + byte[] content) throws IOException { + Header[] headers = new Header[1]; + headers[0] = new Header("Content-Type", contentType); + return post(cluster, path, headers, content); + } + + /** + * Send a POST request + * @param path the path or URI + * @param headers the HTTP headers to include, <tt>Content-Type</tt> must be + * supplied + * @param content the content bytes + * @return a Response object with response detail + * @throws IOException + */ + public Response post(String path, Header[] headers, byte[] content) + throws IOException { + return post(cluster, path, headers, content); + } + + /** + * Send a POST request + * @param cluster the cluster definition + * @param path the path or URI + * @param headers the HTTP headers to include, <tt>Content-Type</tt> must be + * supplied + * @param content the content bytes + * @return a Response object with response detail + * @throws IOException + */ + public Response post(Cluster cluster, String path, Header[] headers, + byte[] content) throws IOException { + PostMethod method = new PostMethod(); + try { + method.setRequestEntity(new ByteArrayRequestEntity(content)); + int code = execute(cluster, method, headers, path); + headers = method.getResponseHeaders(); + content = method.getResponseBody(); + return new Response(code, headers, content); + } finally { + method.releaseConnection(); + } + } + + /** + * Send a DELETE request + * @param path the path or URI + * @return a Response object with response detail + * @throws IOException + */ + public Response delete(String path) throws IOException { + return delete(cluster, path); + } + + /** + * Send a DELETE request + * @param cluster the cluster definition + * @param path the path or URI + * @return a Response object with response detail + * @throws IOException + */ + public Response delete(Cluster cluster, String path) throws IOException { + DeleteMethod method = new DeleteMethod(); + try { + int code = execute(cluster, method, null, path); + Header[] headers = method.getResponseHeaders(); + byte[] content = method.getResponseBody(); + return new Response(code, headers, content); + } finally { + method.releaseConnection(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/052a6f07/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Cluster.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Cluster.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Cluster.java new file mode 100644 index 0000000..a2de329 --- /dev/null +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Cluster.java @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rest.client; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * A list of 'host:port' addresses of HTTP servers operating as a single + * entity, for example multiple redundant web service gateways. + */ [email protected] [email protected] +public class Cluster { + protected List<String> nodes = + Collections.synchronizedList(new ArrayList<String>()); + protected String lastHost; + + /** + * Constructor + */ + public Cluster() {} + + /** + * Constructor + * @param nodes a list of service locations, in 'host:port' format + */ + public Cluster(List<String> nodes) { + nodes.addAll(nodes); + } + + /** + * @return true if no locations have been added, false otherwise + */ + public boolean isEmpty() { + return nodes.isEmpty(); + } + + /** + * Add a node to the cluster + * @param node the service location in 'host:port' format + */ + public Cluster add(String node) { + nodes.add(node); + return this; + } + + /** + * Add a node to the cluster + * @param name host name + * @param port service port + */ + public Cluster add(String name, int port) { + StringBuilder sb = new StringBuilder(); + sb.append(name); + sb.append(':'); + sb.append(port); + return add(sb.toString()); + } + + /** + * Remove a node from the cluster + * @param node the service location in 'host:port' format + */ + public Cluster remove(String node) { + nodes.remove(node); + return this; + } + + /** + * Remove a node from the cluster + * @param name host name + * @param port service port + */ + public Cluster remove(String name, int port) { + StringBuilder sb = new StringBuilder(); + sb.append(name); + sb.append(':'); + sb.append(port); + return remove(sb.toString()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/052a6f07/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteAdmin.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteAdmin.java new file mode 100644 index 0000000..2809ca9 --- /dev/null +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteAdmin.java @@ -0,0 +1,401 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rest.client; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InterruptedIOException; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.rest.Constants; +import org.apache.hadoop.hbase.rest.model.StorageClusterStatusModel; +import org.apache.hadoop.hbase.rest.model.StorageClusterVersionModel; +import org.apache.hadoop.hbase.rest.model.TableListModel; +import org.apache.hadoop.hbase.rest.model.TableSchemaModel; +import org.apache.hadoop.hbase.rest.model.VersionModel; +import org.apache.hadoop.hbase.util.Bytes; + [email protected] [email protected] +public class RemoteAdmin { + + final Client client; + final Configuration conf; + final String accessToken; + final int maxRetries; + final long sleepTime; + + // This unmarshaller is necessary for getting the /version/cluster resource. + // This resource does not support protobufs. Therefore this is necessary to + // request/interpret it as XML. + private static volatile Unmarshaller versionClusterUnmarshaller; + + /** + * Constructor + * + * @param client + * @param conf + */ + public RemoteAdmin(Client client, Configuration conf) { + this(client, conf, null); + } + + static Unmarshaller getUnmarsheller() throws JAXBException { + + if (versionClusterUnmarshaller == null) { + + RemoteAdmin.versionClusterUnmarshaller = JAXBContext.newInstance( + StorageClusterVersionModel.class).createUnmarshaller(); + } + return RemoteAdmin.versionClusterUnmarshaller; + } + + /** + * Constructor + * @param client + * @param conf + * @param accessToken + */ + public RemoteAdmin(Client client, Configuration conf, String accessToken) { + this.client = client; + this.conf = conf; + this.accessToken = accessToken; + this.maxRetries = conf.getInt("hbase.rest.client.max.retries", 10); + this.sleepTime = conf.getLong("hbase.rest.client.sleep", 1000); + } + + /** + * @param tableName name of table to check + * @return true if all regions of the table are available + * @throws IOException if a remote or network exception occurs + */ + public boolean isTableAvailable(String tableName) throws IOException { + return isTableAvailable(Bytes.toBytes(tableName)); + } + + /** + * @return string representing the rest api's version + * @throws IOEXception + * if the endpoint does not exist, there is a timeout, or some other + * general failure mode + */ + public VersionModel getRestVersion() throws IOException { + + StringBuilder path = new StringBuilder(); + path.append('/'); + if (accessToken != null) { + path.append(accessToken); + path.append('/'); + } + + path.append("version/rest"); + + int code = 0; + for (int i = 0; i < maxRetries; i++) { + Response response = client.get(path.toString(), + Constants.MIMETYPE_PROTOBUF); + code = response.getCode(); + switch (code) { + case 200: + + VersionModel v = new VersionModel(); + return (VersionModel) v.getObjectFromMessage(response.getBody()); + case 404: + throw new IOException("REST version not found"); + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("get request to " + path.toString() + + " returned " + code); + } + } + throw new IOException("get request to " + path.toString() + " timed out"); + } + + /** + * @return string representing the cluster's version + * @throws IOEXception if the endpoint does not exist, there is a timeout, or some other general failure mode + */ + public StorageClusterStatusModel getClusterStatus() throws IOException { + + StringBuilder path = new StringBuilder(); + path.append('/'); + if (accessToken !=null) { + path.append(accessToken); + path.append('/'); + } + + path.append("status/cluster"); + + int code = 0; + for (int i = 0; i < maxRetries; i++) { + Response response = client.get(path.toString(), + Constants.MIMETYPE_PROTOBUF); + code = response.getCode(); + switch (code) { + case 200: + StorageClusterStatusModel s = new StorageClusterStatusModel(); + return (StorageClusterStatusModel) s.getObjectFromMessage(response + .getBody()); + case 404: + throw new IOException("Cluster version not found"); + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("get request to " + path + " returned " + code); + } + } + throw new IOException("get request to " + path + " timed out"); + } + + /** + * @return string representing the cluster's version + * @throws IOEXception + * if the endpoint does not exist, there is a timeout, or some other + * general failure mode + */ + public StorageClusterVersionModel getClusterVersion() throws IOException { + + StringBuilder path = new StringBuilder(); + path.append('/'); + if (accessToken != null) { + path.append(accessToken); + path.append('/'); + } + + path.append("version/cluster"); + + int code = 0; + for (int i = 0; i < maxRetries; i++) { + Response response = client.get(path.toString(), Constants.MIMETYPE_XML); + code = response.getCode(); + switch (code) { + case 200: + try { + + return (StorageClusterVersionModel) getUnmarsheller().unmarshal( + new ByteArrayInputStream(response.getBody())); + } catch (JAXBException jaxbe) { + + throw new IOException( + "Issue parsing StorageClusterVersionModel object in XML form: " + + jaxbe.getLocalizedMessage()); + } + case 404: + throw new IOException("Cluster version not found"); + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException(path.toString() + " request returned " + code); + } + } + throw new IOException("get request to " + path.toString() + + " request timed out"); + } + + /** + * @param tableName name of table to check + * @return true if all regions of the table are available + * @throws IOException if a remote or network exception occurs + */ + public boolean isTableAvailable(byte[] tableName) throws IOException { + StringBuilder path = new StringBuilder(); + path.append('/'); + if (accessToken != null) { + path.append(accessToken); + path.append('/'); + } + path.append(Bytes.toStringBinary(tableName)); + path.append('/'); + path.append("exists"); + int code = 0; + for (int i = 0; i < maxRetries; i++) { + Response response = client.get(path.toString(), Constants.MIMETYPE_PROTOBUF); + code = response.getCode(); + switch (code) { + case 200: + return true; + case 404: + return false; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("get request to " + path.toString() + " returned " + code); + } + } + throw new IOException("get request to " + path.toString() + " timed out"); + } + + /** + * Creates a new table. + * @param desc table descriptor for table + * @throws IOException if a remote or network exception occurs + */ + public void createTable(HTableDescriptor desc) + throws IOException { + TableSchemaModel model = new TableSchemaModel(desc); + StringBuilder path = new StringBuilder(); + path.append('/'); + if (accessToken != null) { + path.append(accessToken); + path.append('/'); + } + path.append(desc.getTableName()); + path.append('/'); + path.append("schema"); + int code = 0; + for (int i = 0; i < maxRetries; i++) { + Response response = client.put(path.toString(), Constants.MIMETYPE_PROTOBUF, + model.createProtobufOutput()); + code = response.getCode(); + switch (code) { + case 201: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("create request to " + path.toString() + " returned " + code); + } + } + throw new IOException("create request to " + path.toString() + " timed out"); + } + + /** + * Deletes a table. + * @param tableName name of table to delete + * @throws IOException if a remote or network exception occurs + */ + public void deleteTable(final String tableName) throws IOException { + deleteTable(Bytes.toBytes(tableName)); + } + + /** + * Deletes a table. + * @param tableName name of table to delete + * @throws IOException if a remote or network exception occurs + */ + public void deleteTable(final byte [] tableName) throws IOException { + StringBuilder path = new StringBuilder(); + path.append('/'); + if (accessToken != null) { + path.append(accessToken); + path.append('/'); + } + path.append(Bytes.toStringBinary(tableName)); + path.append('/'); + path.append("schema"); + int code = 0; + for (int i = 0; i < maxRetries; i++) { + Response response = client.delete(path.toString()); + code = response.getCode(); + switch (code) { + case 200: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("delete request to " + path.toString() + " returned " + code); + } + } + throw new IOException("delete request to " + path.toString() + " timed out"); + } + + /** + * @return string representing the cluster's version + * @throws IOEXception + * if the endpoint does not exist, there is a timeout, or some other + * general failure mode + */ + public TableListModel getTableList() throws IOException { + + StringBuilder path = new StringBuilder(); + path.append('/'); + if (accessToken != null) { + path.append(accessToken); + path.append('/'); + } + + int code = 0; + for (int i = 0; i < maxRetries; i++) { + // Response response = client.get(path.toString(), + // Constants.MIMETYPE_XML); + Response response = client.get(path.toString(), + Constants.MIMETYPE_PROTOBUF); + code = response.getCode(); + switch (code) { + case 200: + TableListModel t = new TableListModel(); + return (TableListModel) t.getObjectFromMessage(response.getBody()); + case 404: + throw new IOException("Table list not found"); + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("get request to " + path.toString() + + " request returned " + code); + } + } + throw new IOException("get request to " + path.toString() + + " request timed out"); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/052a6f07/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java new file mode 100644 index 0000000..65bf509 --- /dev/null +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -0,0 +1,858 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rest.client; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.rest.Constants; +import org.apache.hadoop.hbase.rest.model.CellModel; +import org.apache.hadoop.hbase.rest.model.CellSetModel; +import org.apache.hadoop.hbase.rest.model.RowModel; +import org.apache.hadoop.hbase.rest.model.ScannerModel; +import org.apache.hadoop.hbase.rest.model.TableSchemaModel; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +/** + * HTable interface to remote tables accessed via REST gateway + */ [email protected] [email protected] +public class RemoteHTable implements HTableInterface { + + private static final Log LOG = LogFactory.getLog(RemoteHTable.class); + + final Client client; + final Configuration conf; + final byte[] name; + final int maxRetries; + final long sleepTime; + + @SuppressWarnings("rawtypes") + protected String buildRowSpec(final byte[] row, final Map familyMap, + final long startTime, final long endTime, final int maxVersions) { + StringBuffer sb = new StringBuffer(); + sb.append('/'); + sb.append(Bytes.toStringBinary(name)); + sb.append('/'); + sb.append(Bytes.toStringBinary(row)); + Set families = familyMap.entrySet(); + if (families != null) { + Iterator i = familyMap.entrySet().iterator(); + sb.append('/'); + while (i.hasNext()) { + Map.Entry e = (Map.Entry)i.next(); + Collection quals = (Collection)e.getValue(); + if (quals == null || quals.isEmpty()) { + // this is an unqualified family. append the family name and NO ':' + sb.append(Bytes.toStringBinary((byte[])e.getKey())); + } else { + Iterator ii = quals.iterator(); + while (ii.hasNext()) { + sb.append(Bytes.toStringBinary((byte[])e.getKey())); + sb.append(':'); + Object o = ii.next(); + // Puts use byte[] but Deletes use KeyValue + if (o instanceof byte[]) { + sb.append(Bytes.toStringBinary((byte[])o)); + } else if (o instanceof KeyValue) { + sb.append(Bytes.toStringBinary(((KeyValue)o).getQualifier())); + } else { + throw new RuntimeException("object type not handled"); + } + if (ii.hasNext()) { + sb.append(','); + } + } + } + if (i.hasNext()) { + sb.append(','); + } + } + } + if (startTime >= 0 && endTime != Long.MAX_VALUE) { + sb.append('/'); + sb.append(startTime); + if (startTime != endTime) { + sb.append(','); + sb.append(endTime); + } + } else if (endTime != Long.MAX_VALUE) { + sb.append('/'); + sb.append(endTime); + } + if (maxVersions > 1) { + sb.append("?v="); + sb.append(maxVersions); + } + return sb.toString(); + } + + protected String buildMultiRowSpec(final byte[][] rows, int maxVersions) { + StringBuilder sb = new StringBuilder(); + sb.append('/'); + sb.append(Bytes.toStringBinary(name)); + sb.append("/multiget/"); + if (rows == null || rows.length == 0) { + return sb.toString(); + } + sb.append("?"); + for(int i=0; i<rows.length; i++) { + byte[] rk = rows[i]; + if (i != 0) { + sb.append('&'); + } + sb.append("row="); + sb.append(Bytes.toStringBinary(rk)); + } + sb.append("&v="); + sb.append(maxVersions); + + return sb.toString(); + } + + protected Result[] buildResultFromModel(final CellSetModel model) { + List<Result> results = new ArrayList<Result>(); + for (RowModel row: model.getRows()) { + List<Cell> kvs = new ArrayList<Cell>(); + for (CellModel cell: row.getCells()) { + byte[][] split = KeyValue.parseColumn(cell.getColumn()); + byte[] column = split[0]; + byte[] qualifier = null; + if (split.length == 1) { + qualifier = HConstants.EMPTY_BYTE_ARRAY; + } else if (split.length == 2) { + qualifier = split[1]; + } else { + throw new IllegalArgumentException("Invalid familyAndQualifier provided."); + } + kvs.add(new KeyValue(row.getKey(), column, qualifier, + cell.getTimestamp(), cell.getValue())); + } + results.add(Result.create(kvs)); + } + return results.toArray(new Result[results.size()]); + } + + protected CellSetModel buildModelFromPut(Put put) { + RowModel row = new RowModel(put.getRow()); + long ts = put.getTimeStamp(); + for (List<Cell> cells: put.getFamilyCellMap().values()) { + for (Cell cell: cells) { + row.addCell(new CellModel(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), + ts != HConstants.LATEST_TIMESTAMP ? ts : cell.getTimestamp(), + CellUtil.cloneValue(cell))); + } + } + CellSetModel model = new CellSetModel(); + model.addRow(row); + return model; + } + + /** + * Constructor + * @param client + * @param name + */ + public RemoteHTable(Client client, String name) { + this(client, HBaseConfiguration.create(), Bytes.toBytes(name)); + } + + /** + * Constructor + * @param client + * @param conf + * @param name + */ + public RemoteHTable(Client client, Configuration conf, String name) { + this(client, conf, Bytes.toBytes(name)); + } + + /** + * Constructor + * @param client + * @param conf + * @param name + */ + public RemoteHTable(Client client, Configuration conf, byte[] name) { + this.client = client; + this.conf = conf; + this.name = name; + this.maxRetries = conf.getInt("hbase.rest.client.max.retries", 10); + this.sleepTime = conf.getLong("hbase.rest.client.sleep", 1000); + } + + public byte[] getTableName() { + return name.clone(); + } + + @Override + public TableName getName() { + return TableName.valueOf(name); + } + + public Configuration getConfiguration() { + return conf; + } + + public HTableDescriptor getTableDescriptor() throws IOException { + StringBuilder sb = new StringBuilder(); + sb.append('/'); + sb.append(Bytes.toStringBinary(name)); + sb.append('/'); + sb.append("schema"); + for (int i = 0; i < maxRetries; i++) { + Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF); + int code = response.getCode(); + switch (code) { + case 200: + TableSchemaModel schema = new TableSchemaModel(); + schema.getObjectFromMessage(response.getBody()); + return schema.getTableDescriptor(); + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("schema request returned " + code); + } + } + throw new IOException("schema request timed out"); + } + + public void close() throws IOException { + client.shutdown(); + } + + public Result get(Get get) throws IOException { + TimeRange range = get.getTimeRange(); + String spec = buildRowSpec(get.getRow(), get.getFamilyMap(), + range.getMin(), range.getMax(), get.getMaxVersions()); + if (get.getFilter() != null) { + LOG.warn("filters not supported on gets"); + } + Result[] results = getResults(spec); + if (results.length > 0) { + if (results.length > 1) { + LOG.warn("too many results for get (" + results.length + ")"); + } + return results[0]; + } else { + return new Result(); + } + } + + public Result[] get(List<Get> gets) throws IOException { + byte[][] rows = new byte[gets.size()][]; + int maxVersions = 1; + int count = 0; + + for(Get g:gets) { + + if ( count == 0 ) { + maxVersions = g.getMaxVersions(); + } else if (g.getMaxVersions() != maxVersions) { + LOG.warn("MaxVersions on Gets do not match, using the first in the list ("+maxVersions+")"); + } + + if (g.getFilter() != null) { + LOG.warn("filters not supported on gets"); + } + + rows[count] = g.getRow(); + count ++; + } + + String spec = buildMultiRowSpec(rows, maxVersions); + + return getResults(spec); + } + + private Result[] getResults(String spec) throws IOException { + for (int i = 0; i < maxRetries; i++) { + Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF); + int code = response.getCode(); + switch (code) { + case 200: + CellSetModel model = new CellSetModel(); + model.getObjectFromMessage(response.getBody()); + Result[] results = buildResultFromModel(model); + if ( results.length > 0) { + return results; + } + // fall through + case 404: + return new Result[0]; + + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("get request returned " + code); + } + } + throw new IOException("get request timed out"); + } + + public boolean exists(Get get) throws IOException { + LOG.warn("exists() is really get(), just use get()"); + Result result = get(get); + return (result != null && !(result.isEmpty())); + } + + /** + * exists(List) is really a list of get() calls. Just use get(). + * @param gets list of Get to test for the existence + */ + public boolean[] existsAll(List<Get> gets) throws IOException { + LOG.warn("exists(List<Get>) is really list of get() calls, just use get()"); + boolean[] results = new boolean[gets.size()]; + for (int i = 0; i < results.length; i++) { + results[i] = exists(gets.get(i)); + } + return results; + } + + @Deprecated + public Boolean[] exists(List<Get> gets) throws IOException { + boolean[] results = existsAll(gets); + Boolean[] objectResults = new Boolean[results.length]; + for (int i = 0; i < results.length; ++i) { + objectResults[i] = results[i]; + } + return objectResults; + } + + public void put(Put put) throws IOException { + CellSetModel model = buildModelFromPut(put); + StringBuilder sb = new StringBuilder(); + sb.append('/'); + sb.append(Bytes.toStringBinary(name)); + sb.append('/'); + sb.append(Bytes.toStringBinary(put.getRow())); + for (int i = 0; i < maxRetries; i++) { + Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, + model.createProtobufOutput()); + int code = response.getCode(); + switch (code) { + case 200: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("put request failed with " + code); + } + } + throw new IOException("put request timed out"); + } + + public void put(List<Put> puts) throws IOException { + // this is a trick: The gateway accepts multiple rows in a cell set and + // ignores the row specification in the URI + + // separate puts by row + TreeMap<byte[],List<Cell>> map = + new TreeMap<byte[],List<Cell>>(Bytes.BYTES_COMPARATOR); + for (Put put: puts) { + byte[] row = put.getRow(); + List<Cell> cells = map.get(row); + if (cells == null) { + cells = new ArrayList<Cell>(); + map.put(row, cells); + } + for (List<Cell> l: put.getFamilyCellMap().values()) { + cells.addAll(l); + } + } + + // build the cell set + CellSetModel model = new CellSetModel(); + for (Map.Entry<byte[], List<Cell>> e: map.entrySet()) { + RowModel row = new RowModel(e.getKey()); + for (Cell cell: e.getValue()) { + row.addCell(new CellModel(cell)); + } + model.addRow(row); + } + + // build path for multiput + StringBuilder sb = new StringBuilder(); + sb.append('/'); + sb.append(Bytes.toStringBinary(name)); + sb.append("/$multiput"); // can be any nonexistent row + for (int i = 0; i < maxRetries; i++) { + Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, + model.createProtobufOutput()); + int code = response.getCode(); + switch (code) { + case 200: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("multiput request failed with " + code); + } + } + throw new IOException("multiput request timed out"); + } + + public void delete(Delete delete) throws IOException { + String spec = buildRowSpec(delete.getRow(), delete.getFamilyCellMap(), + delete.getTimeStamp(), delete.getTimeStamp(), 1); + for (int i = 0; i < maxRetries; i++) { + Response response = client.delete(spec); + int code = response.getCode(); + switch (code) { + case 200: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("delete request failed with " + code); + } + } + throw new IOException("delete request timed out"); + } + + public void delete(List<Delete> deletes) throws IOException { + for (Delete delete: deletes) { + delete(delete); + } + } + + public void flushCommits() throws IOException { + // no-op + } + + class Scanner implements ResultScanner { + + String uri; + + public Scanner(Scan scan) throws IOException { + ScannerModel model; + try { + model = ScannerModel.fromScan(scan); + } catch (Exception e) { + throw new IOException(e); + } + StringBuffer sb = new StringBuffer(); + sb.append('/'); + sb.append(Bytes.toStringBinary(name)); + sb.append('/'); + sb.append("scanner"); + for (int i = 0; i < maxRetries; i++) { + Response response = client.post(sb.toString(), + Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput()); + int code = response.getCode(); + switch (code) { + case 201: + uri = response.getLocation(); + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("scan request failed with " + code); + } + } + throw new IOException("scan request timed out"); + } + + @Override + public Result[] next(int nbRows) throws IOException { + StringBuilder sb = new StringBuilder(uri); + sb.append("?n="); + sb.append(nbRows); + for (int i = 0; i < maxRetries; i++) { + Response response = client.get(sb.toString(), + Constants.MIMETYPE_PROTOBUF); + int code = response.getCode(); + switch (code) { + case 200: + CellSetModel model = new CellSetModel(); + model.getObjectFromMessage(response.getBody()); + return buildResultFromModel(model); + case 204: + case 206: + return null; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("scanner.next request failed with " + code); + } + } + throw new IOException("scanner.next request timed out"); + } + + @Override + public Result next() throws IOException { + Result[] results = next(1); + if (results == null || results.length < 1) { + return null; + } + return results[0]; + } + + class Iter implements Iterator<Result> { + + Result cache; + + public Iter() { + try { + cache = Scanner.this.next(); + } catch (IOException e) { + LOG.warn(StringUtils.stringifyException(e)); + } + } + + @Override + public boolean hasNext() { + return cache != null; + } + + @Override + public Result next() { + Result result = cache; + try { + cache = Scanner.this.next(); + } catch (IOException e) { + LOG.warn(StringUtils.stringifyException(e)); + cache = null; + } + return result; + } + + @Override + public void remove() { + throw new RuntimeException("remove() not supported"); + } + + } + + @Override + public Iterator<Result> iterator() { + return new Iter(); + } + + @Override + public void close() { + try { + client.delete(uri); + } catch (IOException e) { + LOG.warn(StringUtils.stringifyException(e)); + } + } + + } + + public ResultScanner getScanner(Scan scan) throws IOException { + return new Scanner(scan); + } + + public ResultScanner getScanner(byte[] family) throws IOException { + Scan scan = new Scan(); + scan.addFamily(family); + return new Scanner(scan); + } + + public ResultScanner getScanner(byte[] family, byte[] qualifier) + throws IOException { + Scan scan = new Scan(); + scan.addColumn(family, qualifier); + return new Scanner(scan); + } + + public boolean isAutoFlush() { + return true; + } + + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + throw new IOException("getRowOrBefore not supported"); + } + + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) throws IOException { + // column to check-the-value + put.add(new KeyValue(row, family, qualifier, value)); + + CellSetModel model = buildModelFromPut(put); + StringBuilder sb = new StringBuilder(); + sb.append('/'); + sb.append(Bytes.toStringBinary(name)); + sb.append('/'); + sb.append(Bytes.toStringBinary(put.getRow())); + sb.append("?check=put"); + + for (int i = 0; i < maxRetries; i++) { + Response response = client.put(sb.toString(), + Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput()); + int code = response.getCode(); + switch (code) { + case 200: + return true; + case 304: // NOT-MODIFIED + return false; + case 509: + try { + Thread.sleep(sleepTime); + } catch (final InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("checkAndPut request failed with " + code); + } + } + throw new IOException("checkAndPut request timed out"); + } + + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put) throws IOException { + throw new IOException("checkAndPut for non-equal comparison not implemented"); + } + + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) throws IOException { + Put put = new Put(row); + // column to check-the-value + put.add(new KeyValue(row, family, qualifier, value)); + CellSetModel model = buildModelFromPut(put); + StringBuilder sb = new StringBuilder(); + sb.append('/'); + sb.append(Bytes.toStringBinary(name)); + sb.append('/'); + sb.append(Bytes.toStringBinary(row)); + sb.append("?check=delete"); + + for (int i = 0; i < maxRetries; i++) { + Response response = client.put(sb.toString(), + Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput()); + int code = response.getCode(); + switch (code) { + case 200: + return true; + case 304: // NOT-MODIFIED + return false; + case 509: + try { + Thread.sleep(sleepTime); + } catch (final InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + break; + default: + throw new IOException("checkAndDelete request failed with " + code); + } + } + throw new IOException("checkAndDelete request timed out"); + } + + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete) throws IOException { + throw new IOException("checkAndDelete for non-equal comparison not implemented"); + } + + public Result increment(Increment increment) throws IOException { + throw new IOException("Increment not supported"); + } + + public Result append(Append append) throws IOException { + throw new IOException("Append not supported"); + } + + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, + long amount) throws IOException { + throw new IOException("incrementColumnValue not supported"); + } + + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, + long amount, Durability durability) throws IOException { + throw new IOException("incrementColumnValue not supported"); + } + + @Override + public void batch(List<? extends Row> actions, Object[] results) throws IOException { + throw new IOException("batch not supported"); + } + + @Override + public Object[] batch(List<? extends Row> actions) throws IOException { + throw new IOException("batch not supported"); + } + + @Override + public <R> void batchCallback(List<? extends Row> actions, Object[] results, + Batch.Callback<R> callback) throws IOException, InterruptedException { + throw new IOException("batchCallback not supported"); + } + + @Override + public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) + throws IOException, InterruptedException { + throw new IOException("batchCallback not supported"); + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + throw new UnsupportedOperationException("coprocessorService not implemented"); + } + + @Override + public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, + byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) + throws ServiceException, Throwable { + throw new UnsupportedOperationException("coprocessorService not implemented"); + } + + @Override + public <T extends Service, R> void coprocessorService(Class<T> service, + byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) + throws ServiceException, Throwable { + throw new UnsupportedOperationException("coprocessorService not implemented"); + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + throw new IOException("atomicMutation not supported"); + } + + @Override + public void setAutoFlush(boolean autoFlush) { + throw new UnsupportedOperationException("setAutoFlush not implemented"); + } + + @Override + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { + throw new UnsupportedOperationException("setAutoFlush not implemented"); + } + + @Override + public void setAutoFlushTo(boolean autoFlush) { + throw new UnsupportedOperationException("setAutoFlushTo not implemented"); + } + + @Override + public long getWriteBufferSize() { + throw new UnsupportedOperationException("getWriteBufferSize not implemented"); + } + + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + throw new IOException("setWriteBufferSize not supported"); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, + long amount, boolean writeToWAL) throws IOException { + throw new IOException("incrementColumnValue not supported"); + } + + @Override + public <R extends Message> Map<byte[], R> batchCoprocessorService( + Descriptors.MethodDescriptor method, Message request, + byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable { + throw new UnsupportedOperationException("batchCoprocessorService not implemented"); + } + + @Override + public <R extends Message> void batchCoprocessorService( + Descriptors.MethodDescriptor method, Message request, + byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback) + throws ServiceException, Throwable { + throw new UnsupportedOperationException("batchCoprocessorService not implemented"); + } + + @Override public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, RowMutations rm) throws IOException { + throw new UnsupportedOperationException("checkAndMutate not implemented"); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/052a6f07/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java new file mode 100644 index 0000000..871b646 --- /dev/null +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java @@ -0,0 +1,155 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rest.client; + +import java.io.InputStream; + +import org.apache.commons.httpclient.Header; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * The HTTP result code, response headers, and body of a HTTP response. + */ [email protected] [email protected] +public class Response { + private int code; + private Header[] headers; + private byte[] body; + private InputStream stream; + + /** + * Constructor + * @param code the HTTP response code + */ + public Response(int code) { + this(code, null, null); + } + + /** + * Constructor + * @param code the HTTP response code + * @param headers the HTTP response headers + */ + public Response(int code, Header[] headers) { + this(code, headers, null); + } + + /** + * Constructor + * @param code the HTTP response code + * @param headers the HTTP response headers + * @param body the response body, can be null + */ + public Response(int code, Header[] headers, byte[] body) { + this.code = code; + this.headers = headers; + this.body = body; + } + + /** + * Constructor + * @param code the HTTP response code + * @param headers headers the HTTP response headers + * @param body the response body, can be null + * @param in Inputstream if the response had one. + */ + public Response(int code, Header[] headers, byte[] body, InputStream in) { + this.code = code; + this.headers = headers; + this.body = body; + this.stream = in; + } + + /** + * @return the HTTP response code + */ + public int getCode() { + return code; + } + + /** + * Gets the input stream instance. + * + * @return an instance of InputStream class. + */ + public InputStream getStream(){ + return this.stream; + } + + /** + * @return the HTTP response headers + */ + public Header[] getHeaders() { + return headers; + } + + public String getHeader(String key) { + for (Header header: headers) { + if (header.getName().equalsIgnoreCase(key)) { + return header.getValue(); + } + } + return null; + } + + /** + * @return the value of the Location header + */ + public String getLocation() { + return getHeader("Location"); + } + + /** + * @return true if a response body was sent + */ + public boolean hasBody() { + return body != null; + } + + /** + * @return the HTTP response body + */ + public byte[] getBody() { + return body; + } + + /** + * @param code the HTTP response code + */ + public void setCode(int code) { + this.code = code; + } + + /** + * @param headers the HTTP response headers + */ + public void setHeaders(Header[] headers) { + this.headers = headers; + } + + /** + * @param body the response body + */ + public void setBody(byte[] body) { + this.body = body; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/052a6f07/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/AuthFilter.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/AuthFilter.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/AuthFilter.java new file mode 100644 index 0000000..6d68cdd --- /dev/null +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/AuthFilter.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.rest.filter; + +import static org.apache.hadoop.hbase.rest.Constants.REST_AUTHENTICATION_PRINCIPAL; +import static org.apache.hadoop.hbase.rest.Constants.REST_DNS_INTERFACE; +import static org.apache.hadoop.hbase.rest.Constants.REST_DNS_NAMESERVER; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.net.DNS; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.authentication.server.AuthenticationFilter; + +public class AuthFilter extends AuthenticationFilter { + private static final Log LOG = LogFactory.getLog(AuthFilter.class); + private static final String REST_PREFIX = "hbase.rest.authentication."; + private static final int REST_PREFIX_LEN = REST_PREFIX.length(); + + /** + * Returns the configuration to be used by the authentication filter + * to initialize the authentication handler. + * + * This filter retrieves all HBase configurations and passes those started + * with REST_PREFIX to the authentication handler. It is useful to support + * plugging different authentication handlers. + */ + @Override + protected Properties getConfiguration( + String configPrefix, FilterConfig filterConfig) throws ServletException { + Properties props = super.getConfiguration(configPrefix, filterConfig); + //setting the cookie path to root '/' so it is used for all resources. + props.setProperty(AuthenticationFilter.COOKIE_PATH, "/"); + + Configuration conf = HBaseConfiguration.create(); + for (Map.Entry<String, String> entry : conf) { + String name = entry.getKey(); + if (name.startsWith(REST_PREFIX)) { + String value = entry.getValue(); + if(name.equals(REST_AUTHENTICATION_PRINCIPAL)) { + try { + String machineName = Strings.domainNamePointerToHostName( + DNS.getDefaultHost(conf.get(REST_DNS_INTERFACE, "default"), + conf.get(REST_DNS_NAMESERVER, "default"))); + value = SecurityUtil.getServerPrincipal(value, machineName); + } catch (IOException ie) { + throw new ServletException("Failed to retrieve server principal", ie); + } + } + LOG.debug("Setting property " + name + "=" + value); + name = name.substring(REST_PREFIX_LEN); + props.setProperty(name, value); + } + } + return props; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/052a6f07/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestStream.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestStream.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestStream.java new file mode 100644 index 0000000..02957e9 --- /dev/null +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestStream.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rest.filter; + +import java.io.IOException; +import java.util.zip.GZIPInputStream; + +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + [email protected] +public class GZIPRequestStream extends ServletInputStream +{ + private GZIPInputStream in; + + public GZIPRequestStream(HttpServletRequest request) throws IOException { + this.in = new GZIPInputStream(request.getInputStream()); + } + + @Override + public int read() throws IOException { + return in.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return in.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + + @Override + public void close() throws IOException { + in.close(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/052a6f07/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestWrapper.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestWrapper.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestWrapper.java new file mode 100644 index 0000000..361e442 --- /dev/null +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPRequestWrapper.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rest.filter; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletRequestWrapper; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + [email protected] +public class GZIPRequestWrapper extends HttpServletRequestWrapper { + private ServletInputStream is; + private BufferedReader reader; + + public GZIPRequestWrapper(HttpServletRequest request) throws IOException { + super(request); + this.is = new GZIPRequestStream(request); + this.reader = new BufferedReader(new InputStreamReader(this.is)); + } + + @Override + public ServletInputStream getInputStream() throws IOException { + return is; + } + + @Override + public BufferedReader getReader() throws IOException { + return reader; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/052a6f07/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPResponseStream.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPResponseStream.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPResponseStream.java new file mode 100644 index 0000000..cc74f9c --- /dev/null +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/filter/GZIPResponseStream.java @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rest.filter; + +import java.io.IOException; +import java.util.zip.GZIPOutputStream; + +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletResponse; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + [email protected] +public class GZIPResponseStream extends ServletOutputStream +{ + private HttpServletResponse response; + private GZIPOutputStream out; + + public GZIPResponseStream(HttpServletResponse response) throws IOException { + this.response = response; + this.out = new GZIPOutputStream(response.getOutputStream()); + response.addHeader("Content-Encoding", "gzip"); + } + + public void resetBuffer() { + if (out != null && !response.isCommitted()) { + response.setHeader("Content-Encoding", null); + } + out = null; + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + + @Override + public void close() throws IOException { + finish(); + out.close(); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + public void finish() throws IOException { + out.finish(); + } +}
