[
https://issues.apache.org/jira/browse/METRON-817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15953583#comment-15953583
]
ASF GitHub Bot commented on METRON-817:
---------------------------------------
Github user cestella commented on a diff in the pull request:
https://github.com/apache/incubator-metron/pull/505#discussion_r109433891
--- Diff:
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
---
@@ -74,17 +91,43 @@ public BulkWriterResponse write(String sourceType
) throws Exception
{
BulkWriterResponse response = new BulkWriterResponse();
- SourceHandler handler =
getSourceHandler(configurations.getIndex(sourceType));
+ // Currently treating all the messages in a group for pass/failure.
try {
- handler.handle(messages);
- } catch(Exception e) {
+ // Messages can all result in different HDFS paths, because of
Stellar Expressions, so we'll need to iterate through
+ for(JSONObject message : messages) {
+ Map<String, Object> val =
configurations.getSensorConfig(sourceType);
+ String path = getHdfsPathExtension(
+ sourceType,
+
(String)configurations.getSensorConfig(sourceType).getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF,
""),
+ message
+ );
+ SourceHandler handler = getSourceHandler(sourceType, path);
+ handler.handle(message);
+ }
+ } catch (Exception e) {
response.addAllErrors(e, tuples);
}
response.addAllSuccesses(tuples);
return response;
}
+ public String getHdfsPathExtension(String sourceType, String
stellarFunction, JSONObject message) {
+ // If no function is provided, just use the sourceType directly
+ if(stellarFunction == null || stellarFunction.trim().isEmpty()) {
+ return sourceType;
+ }
+
+ StellarCompiler.Expression expression =
sourceTypeExpressionMap.computeIfAbsent(stellarFunction, s ->
stellarProcessor.compile(stellarFunction));
+ VariableResolver resolver = new MapVariableResolver(message);
--- End diff --
So, is there a reason why this isn't just:
```
//processor is a StellarProcessor();
VariableResolver resolver = new MapVariableResolver(message);
Object objResult = processor.parse(stellarFunction, resolver,
StellarFunctions.FUNCTION_RESOLVER(), Context.EMPTY_CONTEXT());
if(!objResult instanceof String) {
throw new IllegalArgumentException("Stellar Function <" + stellarFunction
+ "> did not return a String value. Returned: " + objResult);
}
return objResult == null?"":(String)objResult;
```
> Customise output file path patterns for HDFS indexing
> -----------------------------------------------------
>
> Key: METRON-817
> URL: https://issues.apache.org/jira/browse/METRON-817
> Project: Metron
> Issue Type: Improvement
> Reporter: Justin Leet
> Assignee: Justin Leet
>
> We need to be able to customize the filepaths for HDFS indexing, to allow
> fields to be part of the naming. For example, if I have a 'tenant' field, I
> should be able to direct it to a path
> {code}
> /apps/metron/.../{tenant}/{sensor}/{date}/filename-34324-432434.json
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)