Repository: metron Updated Branches: refs/heads/master e82139189 -> c559ed7e1
METRON-1379: Add an OBJECT_GET stellar function closes apache/incubator-metron#880 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/c559ed7e Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/c559ed7e Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/c559ed7e Branch: refs/heads/master Commit: c559ed7e1838ec71344eae3d9e37771db2641635 Parents: e821391 Author: cstella <[email protected]> Authored: Tue Jan 9 15:28:47 2018 -0500 Committer: cstella <[email protected]> Committed: Tue Jan 9 15:28:47 2018 -0500 ---------------------------------------------------------------------- .../metron/enrichment/stellar/ObjectGet.java | 156 +++++++++++++++++++ .../enrichment/stellar/ObjectGetTest.java | 90 +++++++++++ metron-stellar/stellar-common/README.md | 9 ++ 3 files changed, 255 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/c559ed7e/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java new file mode 100644 index 0000000..ebb94da --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/stellar/ObjectGet.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.enrichment.stellar; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.metron.common.utils.SerDeUtils; +import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.ParseException; +import org.apache.metron.stellar.dsl.Stellar; +import org.apache.metron.stellar.dsl.StellarFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +@Stellar(namespace="OBJECT" + ,name="GET" + ,description="Retrieve and deserialize a serialized object from HDFS. " + + "The cache can be specified via two properties in the global config: " + + "\"" + ObjectGet.OBJECT_CACHE_SIZE_KEY + "\" (default " + ObjectGet.OBJECT_CACHE_SIZE_DEFAULT + ")," + + "\"" + ObjectGet.OBJECT_CACHE_EXPIRATION_KEY+ "\" (default 1440). Note, if these are changed in global config, " + + "topology restart is required." + , params = { + "path - The path in HDFS to the serialized object" + } + , returns="The deserialized object." +) +public class ObjectGet implements StellarFunction { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final String OBJECT_CACHE_SIZE_KEY = "object.cache.size"; + public static final String OBJECT_CACHE_EXPIRATION_KEY = "object.cache.expiration.minutes"; + public static final int OBJECT_CACHE_SIZE_DEFAULT = 1000; + public static final long OBJECT_CACHE_EXPIRATION_MIN_DEFAULT = TimeUnit.HOURS.toMinutes(24); + protected static LoadingCache<String, Object> cache; + private static ReadWriteLock lock = new ReentrantReadWriteLock(); + + public static class Loader extends CacheLoader<String, Object> { + FileSystem fs; + public Loader(Configuration hadoopConfig) throws IOException { + this.fs = FileSystem.get(hadoopConfig); + } + @Override + public Object load(String s) throws Exception { + if(StringUtils.isEmpty(s)) { + return null; + } + Path p = new Path(s); + if(fs.exists(p)) { + try(InputStream is = new BufferedInputStream(fs.open(p))) { + byte[] serialized = IOUtils.toByteArray(is); + if(serialized.length > 0) { + Object ret = SerDeUtils.fromBytes(serialized, Object.class); + return ret; + } + } + } + return null; + } + } + + @Override + public Object apply(List<Object> args, Context context) throws ParseException { + if(!isInitialized()) { + return null; + } + if(args.size() < 1) { + return null; + } + Object o = args.get(0); + if(o == null) { + return null; + } + if(o instanceof String) { + try { + return cache.get((String)o); + } catch (ExecutionException e) { + throw new IllegalStateException("Unable to retrieve " + o + " because " + e.getMessage(), e); + } + } + else { + throw new IllegalStateException("Unable to retrieve " + o + " as it is not a path"); + } + } + + @Override + public void initialize(Context context) { + try { + lock.writeLock().lock(); + Map<String, Object> config = getConfig(context); + long size = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_SIZE_KEY, OBJECT_CACHE_SIZE_DEFAULT), Long.class); + long expiryMin = ConversionUtils.convert(config.getOrDefault(OBJECT_CACHE_EXPIRATION_KEY, OBJECT_CACHE_EXPIRATION_MIN_DEFAULT), Long.class); + cache = setupCache(size, expiryMin); + } catch (IOException e) { + throw new IllegalStateException("Unable to initialize: " + e.getMessage(), e); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public boolean isInitialized() { + try { + lock.readLock().lock(); + return cache != null; + } + finally { + lock.readLock().unlock(); + } + } + + protected LoadingCache<String, Object> setupCache(long size, long expiryMin) throws IOException { + return CacheBuilder.newBuilder() + .maximumSize(size) + .expireAfterAccess(expiryMin, TimeUnit.MINUTES) + .build(new Loader(new Configuration())); + } + + protected Map<String, Object> getConfig(Context context) { + return (Map<String, Object>) context.getCapability(Context.Capabilities.GLOBAL_CONFIG, false).orElse(new HashMap<>()); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/c559ed7e/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java new file mode 100644 index 0000000..400dfb8 --- /dev/null +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/ObjectGetTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.enrichment.stellar; + +import com.google.common.collect.ImmutableMap; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.metron.common.utils.SerDeUtils; +import org.apache.metron.stellar.common.utils.StellarProcessorUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class ObjectGetTest { + FileSystem fs; + List<String> data; + + @Before + public void setup() throws IOException { + fs = FileSystem.get(new Configuration()); + data = new ArrayList<>(); + { + data.add("apache"); + data.add("metron"); + data.add("is"); + data.add("great"); + } + + } + + @Test + public void test() throws Exception { + String filename = "target/ogt/test.ser"; + Assert.assertTrue(ObjectGet.cache == null || !ObjectGet.cache.asMap().containsKey(filename)); + assertDataIsReadCorrectly(filename); + } + + public void assertDataIsReadCorrectly(String filename) throws IOException { + try(BufferedOutputStream bos = new BufferedOutputStream(fs.create(new Path(filename), true))) { + IOUtils.write(SerDeUtils.toBytes(data), bos); + } + List<String> readData = (List<String>) StellarProcessorUtils.run("OBJECT_GET(loc)", ImmutableMap.of("loc", filename)); + Assert.assertEquals(readData, data); + Assert.assertTrue(ObjectGet.cache.asMap().containsKey(filename)); + } + + @Test + public void testMultithreaded() throws Exception { + String filename = "target/ogt/testmulti.ser"; + Assert.assertTrue(ObjectGet.cache == null || !ObjectGet.cache.asMap().containsKey(filename)); + Thread[] ts = new Thread[10]; + for(int i = 0;i < ts.length;++i) { + ts[i] = new Thread(() -> { + try { + assertDataIsReadCorrectly(filename); + } catch (IOException e) { + throw new IllegalStateException(e.getMessage(), e); + } + }); + ts[i].start(); + } + for(Thread t : ts) { + t.join(); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/c559ed7e/metron-stellar/stellar-common/README.md ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/README.md b/metron-stellar/stellar-common/README.md index 6c420b6..2ef81e8 100644 --- a/metron-stellar/stellar-common/README.md +++ b/metron-stellar/stellar-common/README.md @@ -221,6 +221,7 @@ Where: | [ `MULTISET_MERGE`](#multiset_merge) | | [ `MULTISET_REMOVE`](#multiset_remove) | | [ `MULTISET_TO_SET`](#multiset_to_set) | +| [ `OBJECT_GET`](#object_get) | | [ `PREPEND_IF_MISSING`](#prepend_if_missing) | | [ `PROFILE_GET`](#profile_get) | | [ `PROFILE_FIXED`](#profile_fixed) | @@ -805,6 +806,14 @@ Where: * multiset - The multiset to convert. * Returns: The set of objects in the multiset ignoring multiplicity +### `OBJECT_GET` + * Description: Retrieve and deserialize a serialized object from HDFS. The cache can be specified via two properties + in the global config: "object.cache.size" (default 1000), "object.cache.expiration.minutes" (default 1440). Note, if + these are changed in global config, topology restart is required. + * Input: + * path - The path in HDFS to the serialized object + * Returns: The deserialized object. + ### `PREPEND_IF_MISSING` * Description: Prepends the prefix to the start of the string if the string does not already start with any of the prefixes. * Input:
