http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/result/ModifyResult.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/result/ModifyResult.java b/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/result/ModifyResult.java deleted file mode 100644 index 4c0f56b..0000000 --- a/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/result/ModifyResult.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage.result; - -import java.util.List; - -/** - * @since 3/18/15 - */ -public class ModifyResult<I> extends Result { - public List<I> getIdentifiers() { - return identifiers; - } - - public void setIdentifiers(List<I> identifiers) { - this.identifiers = identifiers; - } - - private List<I> identifiers; -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/result/QueryResult.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/result/QueryResult.java b/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/result/QueryResult.java deleted file mode 100644 index 496315a..0000000 --- a/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/result/QueryResult.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage.result; - -import java.util.List; - -/** - * @since 3/18/15 - */ -public class QueryResult<E> extends Result { - - - public List<E> getData() { - return data; - } - - public void setData(List<E> data) { - this.data = data; - } - - private List<E> data; - private Long firstTimestamp; - - public Long getLastTimestamp() { - return lastTimestamp; - } - - public void setLastTimestamp(Long lastTimestamp) { - this.lastTimestamp = lastTimestamp; - } - - public Long getFirstTimestamp() { - return firstTimestamp; - } - - public void setFirstTimestamp(Long firstTimestamp) { - this.firstTimestamp = firstTimestamp; - } - - private Long lastTimestamp; - - public Class<E> getEntityType() { - return entityType; - } - - public void setEntityType(Class<E> entityType) { - this.entityType = entityType; - } - - private Class<E> entityType; - - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/result/Result.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/result/Result.java b/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/result/Result.java deleted file mode 100644 index 492be08..0000000 --- a/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/result/Result.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage.result; - -/** - * @since 3/18/15 - */ -public class Result { - public boolean isSuccess() { - return success; - } - - public void setSuccess(boolean success) { - this.success = success; - } - private boolean success; - - private int size; - - public void setSize(int size) { - this.size = size; - } - - public int getSize() { - return size; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/spi/DataStorageServiceLoader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/spi/DataStorageServiceLoader.java b/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/spi/DataStorageServiceLoader.java deleted file mode 100644 index da59a18..0000000 --- a/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/spi/DataStorageServiceLoader.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage.spi; - -import org.apache.eagle.storage.exception.IllegalDataStorageTypeException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -/** - * @since 3/20/15 - */ -public class DataStorageServiceLoader { - private final Logger LOG = LoggerFactory.getLogger(DataStorageServiceLoader.class); - private final ServiceLoader<DataStorageServiceProvider> serviceLoader; - private final Map<String,DataStorageServiceProvider> storageServiceProviders; - - private DataStorageServiceLoader(){ - serviceLoader = ServiceLoader.load(DataStorageServiceProvider.class); - storageServiceProviders = new HashMap<String,DataStorageServiceProvider>(); - - // Load storage providers - load(); - } - - private static DataStorageServiceLoader instance; - public static DataStorageServiceLoader getInstance(){ - if(instance == null){ - instance = new DataStorageServiceLoader(); - } - return instance; - } - - private void load(){ - Iterator<DataStorageServiceProvider> dataStorageServiceLoaders = serviceLoader.iterator(); - while(dataStorageServiceLoaders.hasNext()){ - DataStorageServiceProvider provider = dataStorageServiceLoaders.next(); - String storageType = provider.getType(); - - if(storageServiceProviders.containsKey(storageType)) { - LOG.warn("Overrode storage provider: type =" + storageType + ", provider = " + provider); - }else if(storageType == null){ - LOG.error("Loaded storage provider: type = null , provider = " + provider); - throw new IllegalArgumentException("storage type is null from provider: "+provider); - }else{ - LOG.info("Loaded storage provider: type = " + storageType + ", provider = " + provider); - } - this.storageServiceProviders.put(storageType, provider); - } - - LOG.info("Successfully loaded storage engines: "+this.getStorageTypes()); - } - - /** - * Get supported storage types - * - * @return supported storage types - */ - public Set<String> getStorageTypes(){ - return this.storageServiceProviders.keySet(); - } - - /** - * Reload storage providers - */ - @SuppressWarnings("unused") - public void reload(){ - serviceLoader.reload(); - storageServiceProviders.clear(); - load(); - } - - public DataStorageServiceProvider getStorageProviderByType(String type) throws IllegalDataStorageTypeException { - if(!storageServiceProviders.containsKey(type)){ - throw new IllegalDataStorageTypeException("unknown storage type: "+type+", support: "+this.getStorageTypes()); - } - return storageServiceProviders.get(type); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/spi/DataStorageServiceProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/spi/DataStorageServiceProvider.java b/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/spi/DataStorageServiceProvider.java deleted file mode 100644 index 8533206..0000000 --- a/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/spi/DataStorageServiceProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage.spi; - -import org.apache.eagle.storage.DataStorage; - -/** - * @since 3/20/15 - */ -public interface DataStorageServiceProvider<T extends DataStorage> { - /** - * @return unique storage type - */ - String getType(); - - /** - * - * @return storage instance - */ - T getStorage(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-base/src/main/resources/META-INF/services/org.apache.eagle.storage.spi.DataStorageServiceProvider ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-base/src/main/resources/META-INF/services/org.apache.eagle.storage.spi.DataStorageServiceProvider b/eagle-core/eagle-query/eagle-storage-base/src/main/resources/META-INF/services/org.apache.eagle.storage.spi.DataStorageServiceProvider deleted file mode 100644 index 9f19f5f..0000000 --- a/eagle-core/eagle-query/eagle-storage-base/src/main/resources/META-INF/services/org.apache.eagle.storage.spi.DataStorageServiceProvider +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# org.apache.eagle.storage.spi.DataStorageServiceProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/TestDataStorageLoader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/TestDataStorageLoader.java b/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/TestDataStorageLoader.java deleted file mode 100644 index abfed52..0000000 --- a/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/TestDataStorageLoader.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage; - -import org.apache.eagle.storage.exception.IllegalDataStorageException; -import org.apache.eagle.storage.exception.IllegalDataStorageTypeException; -import org.apache.eagle.storage.spi.TestDataStorage; -import org.apache.commons.configuration.AbstractConfiguration; -import org.apache.commons.configuration.CombinedConfiguration; -import org.junit.Test; - -import java.util.Properties; - -/** - * @since 3/18/15 - */ -public class TestDataStorageLoader { - @Test - public void testDataStorage() throws IllegalDataStorageTypeException, IllegalDataStorageException { - DataStorage dataStorage = DataStorageManager.newDataStorage("test"); - assert dataStorage instanceof TestDataStorage; - - // get eagle.storage.type (value: test) from src/test/resources/application.conf - DataStorage dataStorage2 = DataStorageManager.getDataStorageByEagleConfig(); - assert dataStorage2 instanceof TestDataStorage; - - AbstractConfiguration configuration = new CombinedConfiguration(); - configuration.addProperty(DataStorageManager.EAGLE_STORAGE_TYPE,"test"); - DataStorage dataStorage3 = DataStorageManager.newDataStorage(configuration); - assert dataStorage3 instanceof TestDataStorage; - - Properties properties = new Properties(); - properties.put(DataStorageManager.EAGLE_STORAGE_TYPE, "test"); - DataStorage dataStorage4 = DataStorageManager.newDataStorage(properties); - assert dataStorage4 instanceof TestDataStorage; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/TestUri.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/TestUri.java b/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/TestUri.java deleted file mode 100644 index 9f42e29..0000000 --- a/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/TestUri.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage; - -import junit.framework.Assert; -import org.junit.Test; - -import java.net.URI; - -/** - * @since 3/23/15 - */ -public class TestUri { - @Test - public void testUri(){ - String url = "eagle:hbase://zk1:2181,zk2:2181/hbase?connectionTimeout=12"; - String cleanURI = url.substring(6); - - URI uri = URI.create(cleanURI); - Assert.assertEquals("hbase",uri.getScheme()); - - // the problem is here, can not parse host and port - Assert.assertEquals(null,uri.getHost()); - - Assert.assertEquals(-1,uri.getPort()); - Assert.assertEquals("/hbase",uri.getPath()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/spi/TestDataStorage.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/spi/TestDataStorage.java b/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/spi/TestDataStorage.java deleted file mode 100644 index 0973242..0000000 --- a/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/spi/TestDataStorage.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage.spi; - -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.meta.EntityDefinition; -import org.apache.eagle.storage.DataStorageBase; -import org.apache.eagle.storage.operation.CompiledQuery; -import org.apache.eagle.storage.result.ModifyResult; -import org.apache.eagle.storage.result.QueryResult; -import org.junit.Test; - -import java.io.IOException; -import java.util.List; - -/** - * @since 3/20/15 - */ -public class TestDataStorage extends DataStorageBase { - - @Override - public void init() throws IOException { - - } - - @Override - public <E extends TaggedLogAPIEntity> ModifyResult<String> update(List<E> entities, EntityDefinition entityDefinition) throws IOException { - return null; - } - - @Override - public <E extends TaggedLogAPIEntity> ModifyResult<String> create(List<E> entities, EntityDefinition entityDefinition) throws IOException { - return null; - } - - @Override - public <E extends TaggedLogAPIEntity> ModifyResult<String> delete(List<E> entities, EntityDefinition entityDefinition) throws IOException { - return null; - } - - @Override - public ModifyResult<String> deleteByID(List<String> ids, EntityDefinition entityDefinition) throws IOException { - return null; - } - - @Override - public ModifyResult<String> delete(CompiledQuery query, EntityDefinition entityDefinition) throws IOException { - return null; - } - - @Override - public <E> QueryResult<E> query(CompiledQuery query, EntityDefinition entityDefinition) throws IOException { - return null; - } - - @Override - public <E> QueryResult<E> queryById(List<String> ids, EntityDefinition entityDefinition) throws IOException { - return null; - } - - @Override - public void close() throws IOException { - - } - - @Test - public void test() { - - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/spi/TestDataStorageServiceProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/spi/TestDataStorageServiceProvider.java b/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/spi/TestDataStorageServiceProvider.java deleted file mode 100644 index 0920c10..0000000 --- a/eagle-core/eagle-query/eagle-storage-base/src/test/java/org/apache/eagle/storage/spi/TestDataStorageServiceProvider.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage.spi; - -import org.apache.eagle.storage.DataStorage; -import org.junit.Test; - -/** - * @since 3/20/15 - */ -public class TestDataStorageServiceProvider implements DataStorageServiceProvider { - private final static String TEST = "test"; - - @Override - public String getType() { - return TEST; - } - - @Override - public DataStorage getStorage() { - return new TestDataStorage(); - } - - @Test - public void test() { - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-base/src/test/resources/META-INF/services/org.apache.eagle.storage.spi.DataStorageServiceProvider ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-base/src/test/resources/META-INF/services/org.apache.eagle.storage.spi.DataStorageServiceProvider b/eagle-core/eagle-query/eagle-storage-base/src/test/resources/META-INF/services/org.apache.eagle.storage.spi.DataStorageServiceProvider deleted file mode 100644 index 991484a..0000000 --- a/eagle-core/eagle-query/eagle-storage-base/src/test/resources/META-INF/services/org.apache.eagle.storage.spi.DataStorageServiceProvider +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.eagle.storage.spi.TestDataStorageServiceProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-base/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-base/src/test/resources/application.conf b/eagle-core/eagle-query/eagle-storage-base/src/test/resources/application.conf deleted file mode 100644 index dbaac1d..0000000 --- a/eagle-core/eagle-query/eagle-storage-base/src/test/resources/application.conf +++ /dev/null @@ -1,28 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -eagle{ - service{ - env="dev" - host="localhost" - port=8080 - storage-type="test" - table-name-prefixed-with-environment=false - coprocessor-enabled=false - hbase-zookeeper-quorum="localhost" - hbase-zookeeper-property-clientPort=2181 - zookeeper-znode-parent="/hbase-unsecure" - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-base/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-base/src/test/resources/log4j.properties b/eagle-core/eagle-query/eagle-storage-base/src/test/resources/log4j.properties deleted file mode 100644 index d59ded6..0000000 --- a/eagle-core/eagle-query/eagle-storage-base/src/test/resources/log4j.properties +++ /dev/null @@ -1,21 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -log4j.rootLogger=INFO, stdout - -# standard output -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-hbase/pom.xml b/eagle-core/eagle-query/eagle-storage-hbase/pom.xml deleted file mode 100644 index 16762fe..0000000 --- a/eagle-core/eagle-query/eagle-storage-hbase/pom.xml +++ /dev/null @@ -1,103 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>eagle-query-parent</artifactId> - <groupId>eagle</groupId> - <version>0.3.0</version> - <relativePath>../pom.xml</relativePath> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>eagle-storage-hbase</artifactId> - - <dependencies> - <!-- added for jira EAGLE-47 --> - <dependency> - <groupId>org.springframework.security</groupId> - <artifactId>spring-security-core</artifactId> - <version>${spring.framework.version}</version> - </dependency> - <dependency> - <groupId>eagle</groupId> - <artifactId>eagle-audit-base</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>eagle</groupId> - <artifactId>eagle-common</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>eagle</groupId> - <artifactId>eagle-storage-base</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-server</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-client</artifactId> - </dependency> - <dependency> - <groupId>eagle</groupId> - <artifactId>eagle-embed-hbase</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>eagle</groupId> - <artifactId>eagle-embed-hbase</artifactId> - <version>${project.version}</version> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <version>${protobuf-java.version}</version> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <configuration> - <descriptor>src/assembly/hbase-coprocessor.xml</descriptor> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <tarLongFileMode>posix</tarLongFileMode> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-hbase/src/assembly/hbase-coprocessor.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/assembly/hbase-coprocessor.xml b/eagle-core/eagle-query/eagle-storage-hbase/src/assembly/hbase-coprocessor.xml deleted file mode 100644 index fa80c64..0000000 --- a/eagle-core/eagle-query/eagle-storage-hbase/src/assembly/hbase-coprocessor.xml +++ /dev/null @@ -1,56 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>coprocessor</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <dependencySets> - <dependencySet> - <outputDirectory>/</outputDirectory> - <useProjectArtifact>false</useProjectArtifact> - <unpack>true</unpack> - <scope>runtime</scope> - <unpackOptions> - <excludes> - <exclude>**/hbase-site.xml</exclude> - <exclude>**/hdfs-site.xml</exclude> - <exclude>**/hbase-default.xml</exclude> - </excludes> - </unpackOptions> - <excludes> - <!--<exclude>org.apache.hbase:hbase-server</exclude>--> - <!--<exclude>org.apache.hbase:hbase-client</exclude>--> - <exclude>org.apache.hbase:*</exclude> - <exclude>org.apache.hadoop:*</exclude> - <exclude>org.slf4j:slf4j-api</exclude> - <exclude>org.slf4j:log4j-over-slf4j</exclude> - <exclude>org.slf4j:slf4j-log4j12</exclude> - </excludes> - </dependencySet> - </dependencySets> - <fileSets> - <fileSet> - <directory>${project.build.outputDirectory}</directory> - <outputDirectory>/</outputDirectory> - </fileSet> - </fileSets> -</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/HBaseStorage.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/HBaseStorage.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/HBaseStorage.java deleted file mode 100644 index 3cea080..0000000 --- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/HBaseStorage.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage.hbase; - -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.GenericEntityWriter; -import org.apache.eagle.log.entity.HBaseInternalLogHelper; -import org.apache.eagle.log.entity.InternalLog; -import org.apache.eagle.log.entity.index.RowKeyLogReader; -import org.apache.eagle.log.entity.meta.EntityDefinition; -import org.apache.eagle.log.entity.old.GenericDeleter; -import org.apache.eagle.query.GenericQuery; -import org.apache.eagle.storage.DataStorageBase; -import org.apache.eagle.storage.hbase.query.GenericQueryBuilder; -import org.apache.eagle.storage.operation.CompiledQuery; -import org.apache.eagle.storage.result.ModifyResult; -import org.apache.eagle.storage.result.QueryResult; -import org.apache.eagle.common.EagleBase64Wrapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; - -import static org.apache.eagle.audit.common.AuditConstants.AUDIT_EVENT_CREATE; -import static org.apache.eagle.audit.common.AuditConstants.AUDIT_EVENT_UPDATE; -import static org.apache.eagle.audit.common.AuditConstants.AUDIT_EVENT_DELETE; - -/** - * @since 3/18/15 - */ -public class HBaseStorage extends DataStorageBase { - - private final static Logger LOG = LoggerFactory.getLogger(HBaseStorage.class); - private HBaseStorageAudit audit = new HBaseStorageAudit(); - - @Override - public void init() throws IOException { - LOG.info("Initializing"); - } - - @Override - public <E extends TaggedLogAPIEntity> ModifyResult<String> update(List<E> entities, EntityDefinition entityDefinition) throws IOException { - ModifyResult<String> result = create(entities, entityDefinition); - audit.auditOperation(AUDIT_EVENT_UPDATE, entities, null, entityDefinition); // added for jira: EAGLE-47 - return result; - } - - @Override - public <E extends TaggedLogAPIEntity> ModifyResult<String> create(List<E> entities, EntityDefinition entityDefinition) throws IOException { - ModifyResult<String> result = new ModifyResult<>(); - try { - GenericEntityWriter entityWriter = new GenericEntityWriter(entityDefinition); - result.setIdentifiers(entityWriter.write(entities)); - result.setSuccess(true); - } catch (Exception e) { - LOG.error(e.getMessage(),e); - throw new IOException(e); - } - - audit.auditOperation(AUDIT_EVENT_CREATE, entities, null, entityDefinition); // added for jira: EAGLE-47 - return result; - } - - /** - * @param entities - * @param entityDefinition - * @param <E> - * - * @return ModifyResult - * - * @throws IOException - */ - @Override - public <E extends TaggedLogAPIEntity> ModifyResult<String> delete(List<E> entities, EntityDefinition entityDefinition) throws IOException { - ModifyResult<String> result = new ModifyResult<String>(); - try{ - GenericDeleter deleter = new GenericDeleter(entityDefinition.getTable(), entityDefinition.getColumnFamily()); - result.setIdentifiers(deleter.delete(entities)); - }catch(Exception ex){ - LOG.error(ex.getMessage(),ex); - result.setSuccess(false); - throw new IOException(ex); - } - - audit.auditOperation(AUDIT_EVENT_DELETE, entities, null, entityDefinition); // added for jira: EAGLE-47 - result.setSuccess(true); - return result; - } - - /** - * TODO: - * - * @param ids - * @param entityDefinition - * @return - * @throws IOException - */ - @Override - public ModifyResult<String> deleteByID(List<String> ids, EntityDefinition entityDefinition) throws IOException { - ModifyResult<String> result = new ModifyResult<String>(); - try{ - GenericDeleter deleter = new GenericDeleter(entityDefinition.getTable(), entityDefinition.getColumnFamily()); - deleter.deleteByEncodedRowkeys(ids); - result.setIdentifiers(ids); - }catch(Exception ex){ - LOG.error(ex.getMessage(),ex); - result.setSuccess(false); - throw new IOException(ex); - } - - audit.auditOperation(AUDIT_EVENT_DELETE, null, ids, entityDefinition); // added for jira: EAGLE-47 - result.setSuccess(true); - return result; - } - - /** - * TODO: - * - * @param query - * @param entityDefinition - * @return - * @throws IOException - */ - @Override - public ModifyResult<String> delete(CompiledQuery query, EntityDefinition entityDefinition) throws IOException { - if(query.isHasAgg()){ - throw new IOException("delete by aggregation query is not supported"); - } - ModifyResult<String> result; - - try { - LOG.info("Querying for deleting: "+query); - GenericQuery reader = GenericQueryBuilder - .select(query.getSearchCondition().getOutputFields()) - .from(query.getServiceName(),query.getRawQuery().getMetricName()).where(query.getSearchCondition()) - .groupBy(query.isHasAgg(), query.getGroupByFields(), query.getAggregateFunctionTypes(), query.getAggregateFields()) - .timeSeries(query.getRawQuery().isTimeSeries(),query.getRawQuery().getIntervalmin()) - .treeAgg(query.getRawQuery().isTreeAgg()) - .orderBy(query.getSortOptions(),query.getSortFunctions(),query.getSortFields()) - .top(query.getRawQuery().getTop()) - .parallel(query.getRawQuery().getParallel()) - .build(); - List<? extends TaggedLogAPIEntity> entities = reader.result(); - if(entities != null){ - LOG.info("Deleting "+entities.size()+" entities"); - result = delete(entities,entityDefinition); - }else{ - LOG.info("Deleting 0 entities"); - result = new ModifyResult<String>(); - result.setSuccess(true); - } - } catch (Exception e) { - LOG.error(e.getMessage(),e); - throw new IOException(e); - } - return result; - } - - /** - * TODO: - * - * @param query - * @param entityDefinition - * @param <E> - * @return - * @throws IOException - */ - @Override - @SuppressWarnings("unchecked") - public <E extends Object> QueryResult<E> query(CompiledQuery query, EntityDefinition entityDefinition) throws IOException { - QueryResult<E> result = new QueryResult<E>(); - try { - GenericQuery reader = GenericQueryBuilder - .select(query.getSearchCondition().getOutputFields()) - .from(query.getServiceName(),query.getRawQuery().getMetricName()).where(query.getSearchCondition()) - .groupBy(query.isHasAgg(), query.getGroupByFields(), query.getAggregateFunctionTypes(), query.getAggregateFields()) - .timeSeries(query.getRawQuery().isTimeSeries(),query.getRawQuery().getIntervalmin()) - .treeAgg(query.getRawQuery().isTreeAgg()) - .orderBy(query.getSortOptions(),query.getSortFunctions(),query.getSortFields()) - .top(query.getRawQuery().getTop()) - .parallel(query.getRawQuery().getParallel()) - .build(); - List<E> entities = reader.result(); - result.setData(entities); - result.setFirstTimestamp(reader.getFirstTimeStamp()); - result.setLastTimestamp(reader.getLastTimestamp()); - result.setSize(entities.size()); - if(!query.isHasAgg()) result.setEntityType((Class<E>) entityDefinition.getEntityClass()); - result.setSuccess(true); - } catch (Exception e) { - LOG.error(e.getMessage(),e); - throw new IOException(e); - } - return result; - } - - /** - * Query by HBase rowkey - * - * @param ids hbase rowkey list - * @param entityDefinition entity definition - * @param <E> entity type - * @return QueryResult with entity type <E> - * - * @throws IOException - */ - @Override - public <E> QueryResult<E> queryById(List<String> ids, EntityDefinition entityDefinition) throws IOException { - List<byte[]> rowkeys = new ArrayList<>(ids.size()); - QueryResult<E> result = new QueryResult<E>(); - for(String id:ids) rowkeys.add(EagleBase64Wrapper.decode(id)); - RowKeyLogReader reader = null; - try { - reader = new RowKeyLogReader(entityDefinition, rowkeys,null); - reader.open(); - List<TaggedLogAPIEntity> entities = new LinkedList<>(); - - while(true) { - InternalLog log = reader.read(); - if(log == null) break; - TaggedLogAPIEntity entity = HBaseInternalLogHelper.buildEntity(log, entityDefinition); - entities.add(entity); - } - - result.setData((List<E>) entities); - result.setSuccess(true); - result.setSize(entities.size()); - return result; - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IOException(e); - } finally{ - if(reader != null) reader.close(); - } - } - - @Override - public void close() throws IOException { - LOG.info("Shutting down"); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/HBaseStorageAudit.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/HBaseStorageAudit.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/HBaseStorageAudit.java deleted file mode 100644 index f6a6180..0000000 --- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/HBaseStorageAudit.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.storage.hbase; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.eagle.audit.common.AuditEvent; -import org.apache.eagle.audit.entity.GenericAuditEntity; -import org.apache.eagle.audit.listener.AuditListener; -import org.apache.eagle.audit.listener.AuditSupport; -import org.apache.eagle.common.config.EagleConfigFactory; -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.meta.EntityDefinition; -import org.apache.eagle.storage.DataStorageManager; -import org.apache.eagle.storage.exception.IllegalDataStorageTypeException; -import org.apache.eagle.storage.operation.CreateStatement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.security.core.Authentication; -import org.springframework.security.core.context.SecurityContextHolder; - -import static org.apache.eagle.audit.common.AuditConstants.AUDIT_SERVICE_ENDPOINT; -import static org.apache.eagle.audit.common.AuditConstants.AUDIT_COLUMN_SERVICE_NAME; -import static org.apache.eagle.audit.common.AuditConstants.AUDIT_COLUMN_USER_ID; -import static org.apache.eagle.audit.common.AuditConstants.AUDIT_COLUMN_OPERATION; -import static org.apache.eagle.audit.common.AuditConstants.AUDIT_COLUMN_TIMESTAMP; - -/** - * Implementation of AuditListener class. - * Used in HBaseStorage class for auditing HBase operations performed. - */ -public class HBaseStorageAudit implements AuditListener { - - private final static Logger LOG = LoggerFactory.getLogger(HBaseStorageAudit.class); - private AuditSupport auditSupport = new AuditSupport(this); - - public HBaseStorageAudit() { - auditSupport.addAuditListener(this); - } - - @Override - public void auditEvent(AuditEvent event) { - LOG.info("firing audit event: " + event.toString()); - persistAuditEntity(event.getAuditEntities()); - } - - /** - * Method to be invoked for firing audit event. - * @param operation: HBase operation. Values like CREATE/UPDATE/DELETE. - * @param entities: List of entities used in HBase operation. - * @param encodedRowKeys: List of encodededRowKeys returned from successful HBase operation. To be passed only from deletebyID method. - * @param entityDefinition: EntityDefinition object used in the HBaseOperation. - */ - public void auditOperation(String operation, List<? extends TaggedLogAPIEntity> entities, List<String> encodedRowKeys, EntityDefinition entityDefinition) { - if (isAuditingRequired(entityDefinition.getService())) { - List<GenericAuditEntity> auditEntities = buildAuditEntities(operation, entities, encodedRowKeys, entityDefinition); - if (null != auditEntities && 0 != auditEntities.size()) - auditSupport.fireAudit(entityDefinition.getService(), auditEntities); - } - } - - /** - * Check if audit is required based on the service names and audit configuration. - * @param serviceName: Name of the service call. - * @return - */ - private boolean isAuditingRequired (String serviceName) { - if (EagleConfigFactory.load().isServiceAuditingEnabled() - // As per jira EAGLE-47, HBase operation level audit is done only for Policy, Site and DataSource definitions. - && ("AlertDefinitionService".equals(serviceName) || "AlertDataSourceService".equals(serviceName))) { - return true; - } - - return false; - } - - /** - * Build Audit entities based on the available infomration. - * @param operation: HBase operation performed. - * @param entities: List of entities used in HBase operation. - * @param encodedRowKeys: List of encodededRowKeys returned from successful HBase operation. To be passed only from deletebyID method. - * @param entityDefinition: EntityDefinition object used in the HBaseOperation. - * @return - */ - private List<GenericAuditEntity> buildAuditEntities(String operation, List<? extends TaggedLogAPIEntity> entities, List<String> encodedRowKeys, EntityDefinition entityDefinition) { - String serviceName = entityDefinition.getService(); - long timestamp = System.currentTimeMillis()/1000L; - - Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); - String userID = null != authentication.getName() ? authentication.getName() : "data not available"; // empty user - - List<GenericAuditEntity> auditEntities = new ArrayList<GenericAuditEntity>(); - GenericAuditEntity auditEntity = new GenericAuditEntity(); - - if (null != entities && 0 != entities.size()) { - Map<String, String> auditTags; - for (TaggedLogAPIEntity entity : entities) { - auditTags = entity.getTags(); - auditTags.put(AUDIT_COLUMN_SERVICE_NAME, serviceName); - auditTags.put(AUDIT_COLUMN_USER_ID, userID); - auditTags.put(AUDIT_COLUMN_OPERATION, operation); - auditTags.put(AUDIT_COLUMN_TIMESTAMP, timestamp + ""); - - auditEntity = new GenericAuditEntity(); - auditEntity.setTags(auditTags); - auditEntities.add(auditEntity); - } - - return auditEntities; - } else if (null != encodedRowKeys && 0 != encodedRowKeys.size()) { // conditions yields true only in case of deleteByID - Map<String, String> auditTags; - for (String encodedRowKey : encodedRowKeys) { - auditTags = new HashMap<String, String>(); - auditTags.put("encodedRowKey", encodedRowKey); - auditTags.put(AUDIT_COLUMN_SERVICE_NAME, serviceName); - auditTags.put(AUDIT_COLUMN_USER_ID, userID); - auditTags.put(AUDIT_COLUMN_OPERATION, operation); - auditTags.put(AUDIT_COLUMN_TIMESTAMP, timestamp + ""); - - auditEntity = new GenericAuditEntity(); - auditEntity.setTags(auditTags); - auditEntities.add(auditEntity); - } - - return auditEntities; - } else { - return null; - } - } - - /** - * Persists audit entries into HBase. - * @param entityList - */ - private void persistAuditEntity(List<? extends TaggedLogAPIEntity> entityList) { - try { - if (null != entityList && 0 != entityList.size()) { - CreateStatement createStatement = new CreateStatement(entityList, AUDIT_SERVICE_ENDPOINT); - createStatement.execute(DataStorageManager.newDataStorage("hbase")); - } - } catch (IOException | IllegalDataStorageTypeException exception) { - LOG.error("exception in auditing storage event", exception.getMessage()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/GenericQueryBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/GenericQueryBuilder.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/GenericQueryBuilder.java deleted file mode 100755 index 3dd83cf..0000000 --- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/GenericQueryBuilder.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage.hbase.query; - -import java.util.List; - -import org.apache.eagle.query.GenericEntityQuery; -import org.apache.eagle.query.GenericQuery; -import org.apache.eagle.storage.hbase.query.aggregate.GenericAggregateQuery; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.eagle.log.entity.SearchCondition; -import org.apache.eagle.query.aggregate.AggregateCondition; -import org.apache.eagle.query.aggregate.AggregateFunctionType; -import org.apache.eagle.query.aggregate.timeseries.SortOption; - -/** - * TODO: decouple into eagle-query-base module - * - * @since : 10/30/14,2014 - */ -public class GenericQueryBuilder{ - private Logger LOG = LoggerFactory.getLogger(GenericQueryBuilder.class); - - private List<String> outputFields; - private String serviceName; - private String metricName; - private SearchCondition searchCondition; - @Deprecated - private int parallel; - private boolean hasAgg; - private List<String> groupByFields; - private List<AggregateFunctionType> aggregateFuncTypes; - private List<String> aggregateFields; - @Deprecated - private boolean treeAgg=false; - private boolean timeSeries=false; - private long intervalmin; - private List<SortOption> sortOptions; - private int top; - private List<AggregateFunctionType> sortFunctionTypes; - private List<String> sortFields; - - public static GenericQueryBuilder select(List<String> outputFields) { - GenericQueryBuilder builder = new GenericQueryBuilder(); - builder.output(outputFields); - return builder; - } - - public GenericQueryBuilder output(List<String> outputFields) { - this.outputFields = outputFields; - return this; - } - public GenericQueryBuilder from(String serviceName, String metricName) { - this.serviceName = serviceName; - this.metricName = metricName; - return this; - } - - public GenericQueryBuilder where(SearchCondition condition) { - this.searchCondition = condition; - return this; - } - - /** - * TODO: Parameter "parallel" no longer supported, ignore - * - * @param parallel - * @return - */ - @Deprecated - public GenericQueryBuilder parallel(int parallel) { -// throw new IllegalArgumentException("Parameter \"parallel\" no longer supported"); - if(parallel > 0) LOG.warn("Parameter \"parallel\" is deprecated, ignore"); - return this; - } - - /** - * @param hasAgg - * @param groupByFields - * @param aggregateFunctionTypes - * @param aggregateFields - * @return - */ - public GenericQueryBuilder groupBy(boolean hasAgg, List<String> groupByFields, List<AggregateFunctionType> aggregateFunctionTypes, List<String> aggregateFields) { - this.hasAgg = hasAgg; - this.groupByFields = groupByFields; - this.aggregateFuncTypes = aggregateFunctionTypes; - this.aggregateFields = aggregateFields; - return this; - } - - public GenericQueryBuilder timeSeries(boolean timeSeries, long intervalMin) { - this.timeSeries = timeSeries; - this.intervalmin = intervalMin; - return this; - } - - public GenericQueryBuilder orderBy(List<SortOption> sortOptions, List<AggregateFunctionType> sortFunctionTypes, List<String> sortFields) { - this.sortOptions = sortOptions; - this.sortFunctionTypes = sortFunctionTypes; - this.sortFields = sortFields; - return this; - } - - public GenericQueryBuilder top(int top) { - this.top = top; - return this; - } - - /** - * TODO: Parameter "treeAgg" no longer supported, ignore - * - * @param treeAgg - * @return - */ - @Deprecated - public GenericQueryBuilder treeAgg(boolean treeAgg){ - if(treeAgg) LOG.warn("Parameter \"treeAgg\" is deprecated, ignore"); - return this; - } - - public GenericQuery build() throws Exception { - if(hasAgg){ - if(LOG.isDebugEnabled()) LOG.debug("Build GroupAggregateQuery"); - AggregateCondition aggregateCondition = new AggregateCondition(); - aggregateCondition.setGroupbyFields(this.groupByFields); - aggregateCondition.setAggregateFunctionTypes(this.aggregateFuncTypes); - aggregateCondition.setAggregateFields(this.aggregateFields); - aggregateCondition.setTimeSeries(this.timeSeries); - aggregateCondition.setIntervalMS(this.intervalmin * 60 * 1000); - return new GenericAggregateQuery(this.serviceName, - this.searchCondition, - aggregateCondition, - this.metricName, - this.sortOptions,this.sortFunctionTypes,this.sortFields, - this.top); - } else { - if(LOG.isDebugEnabled()) LOG.debug("Build GenericBatchQuery"); - return new GenericEntityQuery(this.serviceName,this.searchCondition,this.metricName); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateQuery.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateQuery.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateQuery.java deleted file mode 100755 index 8f20b61..0000000 --- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateQuery.java +++ /dev/null @@ -1,423 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage.hbase.query.aggregate; - -import org.apache.eagle.log.entity.GenericMetricEntity; -import org.apache.eagle.log.entity.HBaseInternalLogHelper; -import org.apache.eagle.log.entity.SearchCondition; -import org.apache.eagle.log.entity.meta.EntityConstants; -import org.apache.eagle.log.entity.meta.EntityDefinition; -import org.apache.eagle.log.entity.meta.EntityDefinitionManager; -import org.apache.eagle.query.GenericQuery; -import org.apache.eagle.query.QueryConstants; -import org.apache.eagle.query.aggregate.AggregateCondition; -import org.apache.eagle.query.aggregate.AggregateFunctionType; -import org.apache.eagle.query.aggregate.raw.GroupbyKey; -import org.apache.eagle.query.aggregate.raw.GroupbyKeyValue; -import org.apache.eagle.query.aggregate.raw.GroupbyValue; -import org.apache.eagle.query.aggregate.timeseries.PostFlatAggregateSort; -import org.apache.eagle.query.aggregate.timeseries.SortOption; -import org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator; -import org.apache.eagle.query.aggregate.timeseries.TimeSeriesPostFlatAggregateSort; -import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResult; -import org.apache.eagle.storage.hbase.query.coprocessor.impl.AggregateResultCallbackImpl; -import org.apache.eagle.common.DateTimeUtil; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.DoubleWritable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.*; - -/** - * AggregateQuery - * - * <ol> - * <li>Open HBase connection</li> - * <li>Aggregate through Coprocessor</li> - * <li>Build GroupAggregateQuery.GroupAggregateQueryReader to process result and order as sort options</li> - * <li>Return result list</li> - * </ol> - * - * @since : 11/7/14,2014 - */ -public class GenericAggregateQuery implements GenericQuery { - private static final Logger LOG = LoggerFactory.getLogger(GenericAggregateQuery.class); - private final List<AggregateFunctionType> sortFuncs; - private final List<String> sortFields; - - private EntityDefinition entityDef; - private SearchCondition searchCondition; - private AggregateCondition aggregateCondition; - private String prefix; - private long lastTimestamp = 0; - private long firstTimestamp = 0; - private List<SortOption> sortOptions; - private int top; - - private int aggFuncNum; - private int sortAggFuncNum; - private int sortFuncNum; - - /** - * - * @param serviceName - * @param condition - * @param aggregateCondition - * @param metricName - * @throws InstantiationException - * @throws IllegalAccessException - */ - public GenericAggregateQuery(String serviceName, SearchCondition condition, AggregateCondition aggregateCondition, String metricName) - throws InstantiationException, IllegalAccessException{ - this(serviceName, condition, aggregateCondition, metricName,null,null,null,0); - } - - /** - * - * @param serviceName - * @param condition - * @param aggregateCondition - * @param metricName - * @param sortOptions - * @param sortFunctionTypes - * @param sortFields - * @param top - * @throws InstantiationException - * @throws IllegalAccessException - */ - public GenericAggregateQuery(String serviceName, SearchCondition condition, - AggregateCondition aggregateCondition, String metricName, - List<SortOption> sortOptions,List<AggregateFunctionType> sortFunctionTypes,List<String> sortFields,int top) - throws InstantiationException, IllegalAccessException{ - checkNotNull(serviceName, "serviceName"); - this.searchCondition = condition; - this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName); - checkNotNull(entityDef, "EntityDefinition"); - checkNotNull(entityDef, "GroupAggregateCondition"); - this.aggregateCondition = aggregateCondition; - this.aggFuncNum = this.aggregateCondition.getAggregateFunctionTypes().size(); - this.sortOptions = sortOptions; - this.sortFuncs = sortFunctionTypes; - this.sortFuncNum = this.sortOptions == null ? 0: this.sortOptions.size(); - this.sortFields = sortFields; - this.top = top; - - if(serviceName.equals(GenericMetricEntity.GENERIC_METRIC_SERVICE)){ - if(LOG.isDebugEnabled()) LOG.debug("list metric aggregate query"); - if(metricName == null || metricName.isEmpty()){ - throw new IllegalArgumentException("metricName should not be empty for metric list query"); - } - if(!condition.getOutputFields().contains(GenericMetricEntity.VALUE_FIELD)){ - condition.getOutputFields().add(GenericMetricEntity.VALUE_FIELD); - } - this.prefix = metricName; - }else{ - if(LOG.isDebugEnabled()) LOG.debug("list entity aggregate query"); - this.prefix = entityDef.getPrefix(); - } - - // Add sort oriented aggregation functions into aggregateCondtion - if(this.sortOptions!=null){ - // if sort for time series aggregation - if(this.aggregateCondition.isTimeSeries()) { - this.sortAggFuncNum = 0; - int index = 0; - for (SortOption sortOption : this.sortOptions) { - if (!sortOption.isInGroupby()) { - if (LOG.isDebugEnabled()) - LOG.debug("Add additional aggregation functions for sort options " + sortOption.toString() + " in index: " + (this.aggFuncNum + this.sortAggFuncNum)); - AggregateFunctionType _sortFunc = this.sortFuncs.get(index); - if (AggregateFunctionType.avg.equals(_sortFunc)) { - this.aggregateCondition.getAggregateFunctionTypes().add(AggregateFunctionType.sum); - } else { - this.aggregateCondition.getAggregateFunctionTypes().add(_sortFunc); - } - this.aggregateCondition.getAggregateFields().add(this.sortFields.get(index)); - - sortOption.setIndex(this.sortAggFuncNum); - sortAggFuncNum++; - } - index++; - } - } - } - } - - - private void checkNotNull(Object o, String message){ - if(o == null){ - throw new IllegalArgumentException(message + " should not be null"); - } - } - - /** - * TODO: Return List<GroupAggregateAPIEntity> - * - * @see GenericAggregateQuery.TimeSeriesGroupAggregateQueryReader#result() - * @see GenericAggregateQuery.FlatGroupAggregateQueryReader#result() - * - */ - @Override - @SuppressWarnings("raw") - public List result() throws Exception { - Date start = null; - Date end = null; - // shortcut to avoid read when pageSize=0 - if(searchCondition.getPageSize() <= 0){ - return null; - } - // Process the time range if needed - if(entityDef.isTimeSeries()){ - start = DateTimeUtil.humanDateToDate(searchCondition.getStartTime()); - end = DateTimeUtil.humanDateToDate(searchCondition.getEndTime()); - }else{ - start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME); - end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME); - } - // Generate the output qualifiers - final byte[][] outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, searchCondition.getOutputFields()); - GenericAggregateReader reader = new GenericAggregateReader(entityDef, - searchCondition.getPartitionValues(), - start, end, searchCondition.getFilter(), searchCondition.getStartRowkey(), outputQualifiers, this.prefix,this.aggregateCondition); - try{ - if(LOG.isDebugEnabled()) LOG.debug("open and read group aggregate reader"); - reader.open(); - List result = buildGroupAggregateQueryReader(reader,this.aggregateCondition.isTimeSeries()).result(); - if(result == null) throw new IOException("result is null"); - this.firstTimestamp = reader.getFirstTimestamp(); - this.lastTimestamp = reader.getLastTimestamp(); - if(LOG.isDebugEnabled()) LOG.debug("finish read aggregated " + result.size() + " rows"); - return result; - }catch (IOException ex){ - LOG.error("Fail reading aggregated results", ex); - throw ex; - }finally{ - if(reader != null) { - if(LOG.isDebugEnabled()) LOG.debug("Release HBase connection"); - reader.close(); - } - } - } - - /////////////////////////////////////////////////////////// - // GroupAggregateQueryReader(GroupAggregateLogReader) - // |_ FlatGroupAggregateQueryReader - // |_ TimeSeriesGroupAggregateQueryReader - /////////////////////////////////////////////////////////// - - /** - * Factory method for {@link GroupAggregateQueryReader} - * <pre> - * {@link GroupAggregateQueryReader} - * |_ {@link FlatGroupAggregateQueryReader} - * |_ {@link TimeSeriesGroupAggregateQueryReader} - * </pre> - * @param reader - * @param isTimeSeries - * @return - * @throws IOException - */ - private GroupAggregateQueryReader buildGroupAggregateQueryReader(GenericAggregateReader reader,boolean isTimeSeries) throws IOException{ - if(isTimeSeries){ - return new TimeSeriesGroupAggregateQueryReader(reader,this); - }else{ - return new FlatGroupAggregateQueryReader(reader,this); - } - } - - private abstract class GroupAggregateQueryReader { - protected final GenericAggregateReader reader; - protected final GenericAggregateQuery query; - - public GroupAggregateQueryReader(GenericAggregateReader reader, GenericAggregateQuery query){ - this.reader = reader; - this.query = query; - } - public abstract <T> List<T> result() throws Exception; - - protected Map<List<String>, List<Double>> keyValuesToMap(List<GroupbyKeyValue> entities) throws Exception { - Map<List<String>, List<Double>> aggResultMap = new HashMap<List<String>, List<Double>>(); - try { - for(GroupbyKeyValue keyValue:entities){ - List<String> key = new ArrayList<String>(); - for(BytesWritable bw:keyValue.getKey().getValue()){ - key.add(new String(bw.copyBytes(), QueryConstants.CHARSET)); - } - List<Double> value = new ArrayList<Double>(); - for(DoubleWritable wa:keyValue.getValue().getValue()){ - value.add(wa.get()); - } - aggResultMap.put(key, value); - } - } catch (UnsupportedEncodingException e) { - LOG.error(QueryConstants.CHARSET +" not support: "+e.getMessage(),e); - } - return aggResultMap; - } - } - - private class FlatGroupAggregateQueryReader extends GroupAggregateQueryReader{ - public FlatGroupAggregateQueryReader(GenericAggregateReader reader, GenericAggregateQuery query) { - super(reader,query); - } - @Override - public List<Map.Entry<List<String>, List<Double>>> result() throws Exception { - Map<List<String>, List<Double>> aggResultMap = this.keyValuesToMap(this.reader.read()); - if(this.query.sortOptions == null) - return new ArrayList<Map.Entry<List<String>, List<Double>>>(aggResultMap.entrySet()); - if(LOG.isDebugEnabled()) LOG.debug("Flat sorting"); - return PostFlatAggregateSort.sort(aggResultMap, this.query.sortOptions, this.query.top); - } - } - - private class TimeSeriesGroupAggregateQueryReader extends GroupAggregateQueryReader{ - private final Date start; - private final Date end; - private final int pointsNum; - private final int aggFuncNum; - private final List<SortOption> sortOptions; - private final List<AggregateFunctionType> sortFuncs; - private final int sortAggFuncNum; - - public TimeSeriesGroupAggregateQueryReader(GenericAggregateReader reader, GenericAggregateQuery query) throws IOException { - super(reader,query); - try { - if(entityDef.isTimeSeries()){ - this.start = DateTimeUtil.humanDateToDate(searchCondition.getStartTime()); - this.end = DateTimeUtil.humanDateToDate(searchCondition.getEndTime()); - }else{ - start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME); - end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME); - } - this.pointsNum = (int)((end.getTime()-1-start.getTime())/this.query.aggregateCondition.getIntervalMS() + 1); - this.aggFuncNum = this.query.aggFuncNum; - this.sortOptions = this.query.sortOptions; - this.sortFuncs = this.query.sortFuncs; - this.sortAggFuncNum = this.query.sortAggFuncNum; - } catch (Exception e) { - throw new IOException(e); - } - } - - /** - * <h2>TimeSeriesReader result</h2> - * <ol> - * <li>generateTimeSeriesDataPoints()</li> - * <li>if not sort options, return generate time series data points</li> - * <li>if requiring sort, sort time series data points by order of flat aggregation</li> - * </ol> - * - * <h2>Time Series Sort Algorithms</h2> - * <ol> - * <li>Flat aggregate on grouped fields without time series bucket index</li> - * <li>Flat aggregated result according given sortOptions</li> - * <li>Sort Time Series Result according the same order of flat aggregated keys</li> - * </ol> - * - * @see #convertToTimeSeriesDataPoints(java.util.List) - * - * @return - * @throws Exception - */ - @Override - public List<Map.Entry<List<String>, List<double[]>>> result() throws Exception { - List<GroupbyKeyValue> result = this.reader.read(); - - // aggregated data points only - Map<List<String>,List<double[]>> timeseriesDataPoints = convertToTimeSeriesDataPoints(result); - - if(this.query.sortOptions == null) - // return time-series data points without sort - return new ArrayList<Map.Entry<List<String>, List<double[]>>>(timeseriesDataPoints.entrySet()); - - LOG.info("Time series sorting"); - - // Time Series Sort Steps - // ====================== - // 1. Flat aggregate on grouped fields without time series bucket index - // 2. Flat aggregated result according given sortOptions - // 3. Sort Time Series Result according flat aggregated keys' order - - // 1. Flat aggregate on grouped fields without time series bucket index - AggregateResultCallbackImpl callback = new AggregateResultCallbackImpl(this.sortFuncs); - for(GroupbyKeyValue kv:result){ - ArrayList<BytesWritable> copykey = new ArrayList<BytesWritable>(kv.getKey().getValue()); - // remove time series bucket index - copykey.remove(copykey.size()-1); - GroupbyKey key = new GroupbyKey(); - - // [this.aggFuncNum,this.aggFuncNum + this.sortFuncNum) - GroupbyValue value = new GroupbyValue(); - for(int i = this.aggFuncNum;i<this.aggFuncNum+this.sortAggFuncNum;i++){ - value.add(kv.getValue().get(i)); - value.addMeta(kv.getValue().getMeta(i)); - } - key.addAll(copykey); - GroupbyKeyValue keyValue = new GroupbyKeyValue(key,value); - callback.update(keyValue); - } - AggregateResult callbackResult = callback.result(); - Map<List<String>, List<Double>> mapForSort = this.keyValuesToMap(callbackResult.getKeyValues()); - - // 2. Flat aggregated result according given sortOptions -// List<Map.Entry<List<String>, List<Double>>> flatSort = PostFlatAggregateSort.sort(mapForSort , this.sortOptions, Integer.MAX_VALUE); -// mapForSort = new HashMap<List<String>, List<Double>>(); -// for(Map.Entry<List<String>, List<Double>> entry:flatSort){ -// mapForSort.put(entry.getKey(),entry.getValue()); -// } - - // 3. Sort Time Series Result according flat aggregated keys' order - return TimeSeriesPostFlatAggregateSort.sort(mapForSort,timeseriesDataPoints,this.sortOptions,this.query.top); - } - - /** - * Convert raw GroupbyKeyValue list into time-series data points hash map - * - * @param result <code>List<GroupbyKeyValue></code> - * @return Map<List<String>,List<double[]>> - * @throws Exception - */ - private Map<List<String>,List<double[]>> convertToTimeSeriesDataPoints(List<GroupbyKeyValue> result) throws Exception { - Map<List<String>, List<Double>> aggResultMap = this.keyValuesToMap(result); - Map<List<String>,List<double[]>> timeseriesDataPoints = TimeSeriesAggregator.toMetric(aggResultMap,this.pointsNum,this.aggFuncNum); - return timeseriesDataPoints; - } - } - - /** - * Get last / max timestamp - * - * @return lastTimestamp - */ - @Override - public long getLastTimestamp() { - return this.lastTimestamp; - } - - /** - * Get first / min timestamp - * - * @return firstTimestamp - */ - @Override - public long getFirstTimeStamp() { - return this.firstTimestamp; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0ea130ef/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateReader.java b/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateReader.java deleted file mode 100755 index 3d0fa94..0000000 --- a/eagle-core/eagle-query/eagle-storage-hbase/src/main/java/org/apache/eagle/storage/hbase/query/aggregate/GenericAggregateReader.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.storage.hbase.query.aggregate; - -import org.apache.eagle.log.entity.AbstractHBaseLogReader; -import org.apache.eagle.log.entity.meta.EntityDefinition; -import org.apache.eagle.query.aggregate.AggregateCondition; -import org.apache.eagle.storage.hbase.query.coprocessor.impl.AggregateClientImpl; -import org.apache.eagle.query.aggregate.raw.GroupbyKeyValue; -import org.apache.eagle.storage.hbase.query.coprocessor.AggregateClient; -import org.apache.eagle.storage.hbase.query.coprocessor.AggregateResult; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.Filter; - -import java.io.IOException; -import java.util.Date; -import java.util.List; - -/** - * @since : 11/7/14,2014 - */ -public class GenericAggregateReader extends AbstractHBaseLogReader<List<GroupbyKeyValue>> { - private final long startTime; - private final long endTime; - private AggregateClient aggregateClient = new AggregateClientImpl(); - private EntityDefinition ed; - private final AggregateCondition aggregateCondition; - private AggregateResult result; - - /** - * - * @param ed Entity Definition - * @param partitions Partition values - * @param startTime Start time - * @param endTime End time - * @param filter HBase filter for scanning - * @param lastScanKey Last HBase scan row key in String - * @param outputQualifiers HBase output qualifiers in bytes - * @param condition GroupAggregateCondition Object - * - * @see org.apache.eagle.query.aggregate.AggregateCondition - */ - @SuppressWarnings("unused") - private GenericAggregateReader(EntityDefinition ed, - List<String> partitions, - Date startTime, - Date endTime, - Filter filter, - String lastScanKey, - byte[][] outputQualifiers, - AggregateCondition condition) { - super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers); - this.ed = ed; - this.startTime = startTime.getTime(); - this.endTime = endTime.getTime(); - this.aggregateCondition = condition; - } - - /** - * - * @param ed Entity Definition - * @param partitions Partition values - * @param startTime Start time - * @param endTime End time - * @param filter HBase filter for scanning - * @param lastScanKey Last HBase scan row key in String - * @param outputQualifiers HBase output qualifiers in bytes - * @param prefix HBase prefix, not necessary except for GenericMetric query - * @param condition GroupAggregateCondition Object - * - * @see org.apache.eagle.query.aggregate.AggregateCondition - */ - public GenericAggregateReader(EntityDefinition ed, - List<String> partitions, - Date startTime, - Date endTime, - Filter filter, - String lastScanKey, - byte[][] outputQualifiers, - String prefix, - AggregateCondition condition) { - super(ed, partitions, startTime, endTime, filter, lastScanKey, outputQualifiers, prefix); - this.ed = ed; - this.startTime = startTime.getTime(); - this.endTime = endTime.getTime(); - this.aggregateCondition = condition; - } - - @Override - protected void onOpen(HTableInterface tbl, Scan scan) throws IOException { - this.result = this.aggregateClient.aggregate( - tbl, - this.ed, - scan, - this.aggregateCondition.getGroupbyFields(), - this.aggregateCondition.getAggregateFunctionTypes(), - this.aggregateCondition.getAggregateFields(), - this.aggregateCondition.isTimeSeries(), - this.startTime, - this.endTime, - this.aggregateCondition.getIntervalMS()); - } - - @Override - public List<GroupbyKeyValue> read() throws IOException { - return this.result.getKeyValues(); - } - - public long getFirstTimestamp() { - return this.result.getStartTimestamp(); - } - - public long getLastTimestamp() { - return this.result.getStopTimestamp(); - } -} \ No newline at end of file