Merge remote-tracking branch 'origin/master' into feature/METRON-1416-upgrade-solr
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/d0a4e4c0 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/d0a4e4c0 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/d0a4e4c0 Branch: refs/heads/master Commit: d0a4e4c0f15d6b05371e97be19177729b1b33243 Parents: f8d7843 2b4f0b8 Author: merrimanr <[email protected]> Authored: Thu Apr 26 14:44:44 2018 -0500 Committer: merrimanr <[email protected]> Committed: Fri Apr 27 14:26:43 2018 -0500 ---------------------------------------------------------------------- dependencies_with_url.csv | 7 +- dev-utilities/committer-utils/README.md | 75 +-- dev-utilities/committer-utils/prepare-commit | 77 ++- .../client/stellar/ProfilerFunctions.java | 14 +- .../profiler/DefaultMessageDistributor.java | 235 +++++++- .../metron/profiler/DefaultProfileBuilder.java | 115 ++-- .../metron/profiler/MessageDistributor.java | 48 +- .../apache/metron/profiler/MessageRoute.java | 19 +- .../apache/metron/profiler/MessageRouter.java | 11 +- .../apache/metron/profiler/ProfileBuilder.java | 34 +- .../metron/profiler/ProfileMeasurement.java | 6 +- .../metron/profiler/StandAloneProfiler.java | 100 +++- .../org/apache/metron/profiler/clock/Clock.java | 18 +- .../metron/profiler/clock/ClockFactory.java | 38 ++ .../profiler/clock/DefaultClockFactory.java | 57 ++ .../metron/profiler/clock/EventTimeClock.java | 72 +++ .../metron/profiler/clock/FixedClock.java | 39 +- .../profiler/clock/FixedClockFactory.java | 44 ++ .../apache/metron/profiler/clock/WallClock.java | 17 +- .../profiler/DefaultMessageDistributorTest.java | 171 +++++- .../profiler/DefaultProfileBuilderTest.java | 119 ++-- .../metron/profiler/ProfilePeriodTest.java | 1 - .../metron/profiler/StandAloneProfilerTest.java | 255 ++++++++ .../profiler/clock/DefaultClockFactoryTest.java | 75 +++ .../profiler/clock/EventTimeClockTest.java | 115 ++++ .../metron/profiler/clock/WallClockTest.java | 54 ++ metron-analytics/metron-profiler/README.md | 108 +++- .../src/main/config/profiler.properties | 14 +- .../src/main/flux/profiler/remote.yaml | 50 +- .../profiler/bolt/DestinationHandler.java | 56 -- .../bolt/FixedFrequencyFlushSignal.java | 135 +++++ .../metron/profiler/bolt/FlushSignal.java | 51 ++ .../profiler/bolt/HBaseDestinationHandler.java | 58 -- .../metron/profiler/bolt/HBaseEmitter.java | 73 +++ .../profiler/bolt/KafkaDestinationHandler.java | 110 ---- .../metron/profiler/bolt/KafkaEmitter.java | 164 +++++ .../metron/profiler/bolt/ManualFlushSignal.java | 54 ++ .../profiler/bolt/ProfileBuilderBolt.java | 404 ++++++++++--- .../bolt/ProfileMeasurementEmitter.java | 59 ++ .../profiler/bolt/ProfileSplitterBolt.java | 136 ++++- .../zookeeper/event-time-test/profiler.json | 12 + .../config/zookeeper/percentiles/profiler.json | 12 - .../processing-time-test/profiler.json | 11 + .../zookeeper/readme-example-1/profiler.json | 17 - .../zookeeper/readme-example-2/profiler.json | 18 - .../zookeeper/readme-example-3/profiler.json | 11 - .../zookeeper/readme-example-4/profiler.json | 11 - .../bolt/FixedFrequencyFlushSignalTest.java | 71 +++ .../metron/profiler/bolt/HBaseEmitterTest.java | 120 ++++ .../bolt/KafkaDestinationHandlerTest.java | 203 ------- .../metron/profiler/bolt/KafkaEmitterTest.java | 291 +++++++++ .../profiler/bolt/ProfileBuilderBoltTest.java | 468 ++++++++------- .../profiler/bolt/ProfileHBaseMapperTest.java | 6 +- .../profiler/bolt/ProfileSplitterBoltTest.java | 288 +++++++-- .../profiler/integration/MessageBuilder.java | 75 +++ .../integration/ProfilerIntegrationTest.java | 329 +++++----- metron-contrib/metron-performance/README.md | 205 +++++++ .../performance_measurement.png | Bin 0 -> 5790 bytes metron-contrib/metron-performance/pom.xml | 134 +++++ .../src/main/assembly/assembly.xml | 42 ++ .../metron/performance/load/LoadGenerator.java | 175 ++++++ .../metron/performance/load/LoadOptions.java | 499 ++++++++++++++++ .../performance/load/MessageGenerator.java | 48 ++ .../metron/performance/load/SendToKafka.java | 107 ++++ .../load/monitor/AbstractMonitor.java | 49 ++ .../load/monitor/EPSGeneratedMonitor.java | 53 ++ .../monitor/EPSThroughputWrittenMonitor.java | 77 +++ .../performance/load/monitor/MonitorNaming.java | 23 + .../performance/load/monitor/MonitorTask.java | 44 ++ .../performance/load/monitor/Results.java | 51 ++ .../load/monitor/writers/CSVWriter.java | 67 +++ .../load/monitor/writers/ConsoleWriter.java | 65 ++ .../load/monitor/writers/Writable.java | 40 ++ .../load/monitor/writers/Writer.java | 86 +++ .../performance/sampler/BiasedSampler.java | 113 ++++ .../metron/performance/sampler/Sampler.java | 24 + .../performance/sampler/UnbiasedSampler.java | 28 + .../metron/performance/util/KafkaUtil.java | 56 ++ .../src/main/scripts/load_tool.sh | 36 ++ .../performance/load/LoadOptionsTest.java | 93 +++ .../performance/load/SendToKafkaTest.java | 49 ++ .../metron/performance/sampler/SamplerTest.java | 145 +++++ metron-contrib/pom.xml | 15 + metron-deployment/Kerberos-manual-setup.md | 209 +++++++ metron-deployment/amazon-ec2/README.md | 90 +-- metron-deployment/amazon-ec2/playbook.yml | 4 +- .../ansible/playbooks/metron_full_install.yml | 4 +- .../roles/ambari_master/defaults/main.yml | 2 + .../ambari_master/tasks/elasticsearch_mpack.yml | 26 + .../ansible/roles/ambari_master/tasks/main.yml | 3 +- .../roles/ambari_master/tasks/metron_mpack.yml | 26 + .../ansible/roles/ambari_master/tasks/mpack.yml | 26 - .../roles/load_web_templates/tasks/main.yml | 2 +- .../roles/metron-builder/tasks/build-debs.yml | 2 +- .../roles/metron-builder/tasks/build-rpms.yml | 2 +- metron-deployment/development/README.md | 5 + metron-deployment/development/centos6/README.md | 4 +- .../development/centos6/Vagrantfile | 22 +- .../development/centos6/ansible/playbook.yml | 23 + .../development/ubuntu14/README.md | 4 +- .../development/ubuntu14/Vagrantfile | 16 +- .../manual-install/Manual_Install_CentOS6.md | 4 +- metron-deployment/packaging/ambari/README.md | 193 +++--- .../ambari/elasticsearch-mpack/README.md | 62 ++ .../ambari/elasticsearch-mpack/pom.xml | 95 +++ .../src/main/assemblies/elasticsearch-mpack.xml | 43 ++ .../ELASTICSEARCH/5.6.2/metainfo.xml | 29 + .../ELASTICSEARCH/5.6.2/repos/repoinfo.xml | 45 ++ .../addon-services/KIBANA/5.6.2/metainfo.xml | 30 + .../KIBANA/5.6.2/quicklinks/quicklinks.json | 27 + .../KIBANA/5.6.2/repos/repoinfo.xml | 60 ++ .../5.6.2/configuration/elastic-env.xml | 86 +++ .../5.6.2/configuration/elastic-jvm-options.xml | 144 +++++ .../5.6.2/configuration/elastic-site.xml | 198 +++++++ .../5.6.2/configuration/elastic-sysconfig.xml | 97 +++ .../5.6.2/configuration/elastic-systemd.xml | 30 + .../ELASTICSEARCH/5.6.2/metainfo.xml | 97 +++ .../5.6.2/package/scripts/elastic_commands.py | 266 +++++++++ .../5.6.2/package/scripts/elastic_master.py | 72 +++ .../5.6.2/package/scripts/elastic_slave.py | 71 +++ .../5.6.2/package/scripts/params.py | 108 ++++ .../5.6.2/package/scripts/properties_config.py | 34 ++ .../5.6.2/package/scripts/service_check.py | 114 ++++ .../5.6.2/package/scripts/status_params.py | 27 + .../templates/elasticsearch.master.yaml.j2 | 77 +++ .../templates/elasticsearch.slave.yaml.j2 | 78 +++ .../templates/elasticsearch_limits.conf.j2 | 20 + .../5.6.2/quicklinks/quicklinks.json | 43 ++ .../ELASTICSEARCH/5.6.2/role_command_order.json | 8 + .../KIBANA/5.6.2/configuration/kibana-env.xml | 72 +++ .../KIBANA/5.6.2/configuration/kibana-site.xml | 113 ++++ .../common-services/KIBANA/5.6.2/metainfo.xml | 84 +++ .../KIBANA/5.6.2/package/scripts/common.py | 56 ++ .../5.6.2/package/scripts/kibana_master.py | 81 +++ .../KIBANA/5.6.2/package/scripts/params.py | 50 ++ .../KIBANA/5.6.2/quicklinks/quicklinks.json | 28 + .../src/main/resources/mpack.json | 76 +++ .../packaging/ambari/metron-mpack/README.md | 20 +- .../packaging/ambari/metron-mpack/pom.xml | 7 +- .../src/main/assemblies/metron-mpack.xml | 14 - .../ELASTICSEARCH/5.6.2/metainfo.xml | 29 - .../ELASTICSEARCH/5.6.2/repos/repoinfo.xml | 45 -- .../addon-services/KIBANA/5.6.2/metainfo.xml | 30 - .../KIBANA/5.6.2/quicklinks/quicklinks.json | 27 - .../KIBANA/5.6.2/repos/repoinfo.xml | 60 -- .../5.6.2/configuration/elastic-env.xml | 86 --- .../5.6.2/configuration/elastic-jvm-options.xml | 144 ----- .../5.6.2/configuration/elastic-site.xml | 198 ------- .../5.6.2/configuration/elastic-sysconfig.xml | 97 --- .../5.6.2/configuration/elastic-systemd.xml | 30 - .../ELASTICSEARCH/5.6.2/metainfo.xml | 97 --- .../5.6.2/package/scripts/elastic_commands.py | 266 --------- .../5.6.2/package/scripts/elastic_master.py | 72 --- .../5.6.2/package/scripts/elastic_slave.py | 71 --- .../5.6.2/package/scripts/params.py | 108 ---- .../5.6.2/package/scripts/properties_config.py | 34 -- .../5.6.2/package/scripts/service_check.py | 114 ---- .../5.6.2/package/scripts/status_params.py | 27 - .../templates/elasticsearch.master.yaml.j2 | 77 --- .../templates/elasticsearch.slave.yaml.j2 | 78 --- .../templates/elasticsearch_limits.conf.j2 | 20 - .../5.6.2/quicklinks/quicklinks.json | 43 -- .../ELASTICSEARCH/5.6.2/role_command_order.json | 8 - .../KIBANA/5.6.2/configuration/kibana-env.xml | 72 --- .../KIBANA/5.6.2/configuration/kibana-site.xml | 113 ---- .../common-services/KIBANA/5.6.2/metainfo.xml | 94 --- .../KIBANA/5.6.2/package/scripts/common.py | 56 -- .../5.6.2/package/scripts/dashboard/__init__.py | 16 - .../scripts/dashboard/dashboard-bulkload.json | 88 --- .../package/scripts/dashboard/dashboardindex.py | 95 --- .../package/scripts/dashboard/kibana.template | 233 -------- .../5.6.2/package/scripts/kibana_master.py | 119 ---- .../KIBANA/5.6.2/package/scripts/params.py | 50 -- .../KIBANA/5.6.2/quicklinks/quicklinks.json | 28 - .../configuration/metron-enrichment-env.xml | 130 +++- .../METRON/CURRENT/configuration/metron-env.xml | 2 - .../configuration/metron-profiler-env.xml | 77 ++- .../CURRENT/configuration/metron-rest-env.xml | 12 + .../common-services/METRON/CURRENT/metainfo.xml | 20 + .../package/scripts/dashboard/__init__.py | 16 + .../scripts/dashboard/dashboard-bulkload.json | 88 +++ .../package/scripts/dashboard/dashboardindex.py | 95 +++ .../package/scripts/dashboard/kibana.template | 233 ++++++++ .../package/scripts/enrichment_commands.py | 20 +- .../package/scripts/enrichment_master.py | 12 +- .../package/scripts/indexing_commands.py | 43 +- .../CURRENT/package/scripts/indexing_master.py | 32 + .../CURRENT/package/scripts/metron_service.py | 10 + .../package/scripts/params/params_linux.py | 33 +- .../package/scripts/params/status_params.py | 7 +- .../CURRENT/package/scripts/rest_commands.py | 69 ++- .../CURRENT/package/scripts/rest_master.py | 16 +- .../enrichment-splitjoin.properties.j2 | 63 ++ .../templates/enrichment-unified.properties.j2 | 60 ++ .../package/templates/profiler.properties.j2 | 15 +- .../METRON/CURRENT/themes/metron_theme.json | 275 ++++++++- .../packaging/docker/deb-docker/pom.xml | 6 + .../docker/rpm-docker/SPECS/metron.spec | 29 +- .../packaging/docker/rpm-docker/pom.xml | 6 + .../packaging/packer-build/README.md | 2 +- metron-deployment/pom.xml | 1 + .../alert-filters/alert-filters.e2e-spec.ts | 11 +- .../meta-alerts/meta-alert.e2e-spec.ts | 5 +- .../alerts-list/tree-view/tree-view.e2e-spec.ts | 5 +- .../alert-filters/alert-filters.component.ts | 4 +- .../src/app/model/search-request.ts | 2 +- .../app/shared/group-by/group-by.component.ts | 4 +- metron-interface/metron-config/package.json | 2 +- .../apache/metron/rest/model/AlertProfile.java | 88 --- .../metron/rest/model/AlertsUIUserSettings.java | 90 +++ metron-interface/metron-rest/README.md | 172 +++--- metron-interface/metron-rest/pom.xml | 5 + .../apache/metron/rest/MetronRestConstants.java | 4 + .../apache/metron/rest/config/HBaseConfig.java | 55 ++ .../metron/rest/controller/AlertController.java | 119 ---- .../rest/controller/AlertsUIController.java | 124 ++++ .../metron/rest/controller/UserController.java | 3 +- .../rest/repository/AlertProfileRepository.java | 25 - .../metron/rest/service/AlertService.java | 39 -- .../rest/service/AlertsProfileService.java | 32 - .../metron/rest/service/AlertsUIService.java | 41 ++ .../apache/metron/rest/service/UserService.java | 33 ++ .../rest/service/impl/AlertServiceImpl.java | 97 --- .../service/impl/AlertsProfileServiceImpl.java | 66 --- .../rest/service/impl/AlertsUIServiceImpl.java | 131 ++++ .../rest/service/impl/SearchServiceImpl.java | 22 +- .../src/main/resources/application-test.yml | 2 + .../src/main/resources/application.yml | 9 +- .../metron-rest/src/main/scripts/metron-rest.sh | 9 + .../metron/rest/config/HBaseConfigTest.java | 69 +++ .../apache/metron/rest/config/TestConfig.java | 26 +- .../AlertControllerIntegrationTest.java | 345 ----------- .../AlertsUIControllerIntegrationTest.java | 340 +++++++++++ .../SearchControllerIntegrationTest.java | 61 +- .../UserControllerIntegrationTest.java | 40 +- .../rest/service/impl/AlertServiceImplTest.java | 152 ----- .../service/impl/AlertsUIServiceImplTest.java | 180 ++++++ .../service/impl/SearchServiceImplTest.java | 60 +- metron-platform/Performance-tuning-guide.md | 259 +++++++- metron-platform/metron-common/README.md | 32 + .../src/main/config/zookeeper/global.json | 5 +- .../configuration/ConfigurationsUtils.java | 123 +++- .../common/configuration/FieldTransformer.java | 4 +- .../configuration/SensorParserConfig.java | 15 + .../enrichment/handler/ConfigHandler.java | 4 + .../configuration/profiler/ProfileConfig.java | 159 ++++- .../profiler/ProfileResultExpressions.java | 4 +- .../profiler/ProfileTriageExpressions.java | 8 + .../configuration/profiler/ProfilerConfig.java | 115 +++- .../transformation/FieldTransformations.java | 1 + .../transformation/RenameTransformation.java | 55 ++ .../transformation/StellarTransformation.java | 3 +- .../common/message/BytesFromPosition.java | 4 +- .../message/JSONFromFieldByReference.java | 37 ++ .../metron/common/message/JSONFromPosition.java | 4 +- .../metron/common/message/MessageGetters.java | 1 + .../apache/metron/common/utils/HDFSUtils.java | 59 ++ .../apache/metron/common/utils/JSONUtils.java | 11 +- .../metron/common/utils/ReflectionUtils.java | 66 ++- .../org/apache/metron/common/writer/test.json | 31 - .../configurations/ProfilerUpdater.java | 1 + .../src/main/scripts/cluster_info.py | 389 ++++++++++++ .../profiler/ProfileConfigTest.java | 107 +++- .../profiler/ProfilerConfigTest.java | 209 +++++++ .../metron/common/error/MetronErrorTest.java | 18 +- .../transformation/FieldTransformationTest.java | 17 +- .../RenameTransformationTest.java | 99 ++++ .../StellarTransformationTest.java | 30 + .../ZKConfigurationsCacheIntegrationTest.java | 4 +- .../elasticsearch/dao/ElasticsearchDao.java | 2 +- .../dao/ElasticsearchSearchDao.java | 30 +- .../elasticsearch/utils/ElasticsearchUtils.java | 107 +++- .../writer/ElasticsearchWriter.java | 8 +- .../scripts/start_elasticsearch_topology.sh | 8 +- .../writer/ElasticsearchWriterTest.java | 19 +- .../metron-enrichment/Performance.md | 514 ++++++++++++++++ metron-platform/metron-enrichment/README.md | 47 +- metron-platform/metron-enrichment/pom.xml | 6 + .../main/config/enrichment-splitjoin.properties | 63 ++ .../config/enrichment-splitjoin.properties.j2 | 63 ++ .../main/config/enrichment-unified.properties | 69 +++ .../config/enrichment-unified.properties.j2 | 60 ++ .../src/main/config/enrichment.properties | 64 -- .../src/main/config/enrichment.properties.j2 | 63 -- .../main/flux/enrichment/remote-splitjoin.yaml | 590 ++++++++++++++++++ .../main/flux/enrichment/remote-unified.yaml | 387 ++++++++++++ .../src/main/flux/enrichment/remote.yaml | 594 ------------------- .../adapters/stellar/StellarAdapter.java | 5 +- .../enrichment/bolt/EnrichmentJoinBolt.java | 4 +- .../enrichment/bolt/EnrichmentSplitterBolt.java | 4 +- .../enrichment/bolt/GenericEnrichmentBolt.java | 33 +- .../apache/metron/enrichment/bolt/JoinBolt.java | 34 +- .../enrichment/bolt/ThreatIntelJoinBolt.java | 119 +--- .../bolt/ThreatIntelSplitterBolt.java | 4 +- .../enrichment/bolt/UnifiedEnrichmentBolt.java | 412 +++++++++++++ .../enrichment/parallel/ConcurrencyContext.java | 96 +++ .../enrichment/parallel/EnrichmentCallable.java | 66 +++ .../enrichment/parallel/EnrichmentContext.java | 43 ++ .../parallel/EnrichmentStrategies.java | 108 ++++ .../enrichment/parallel/EnrichmentStrategy.java | 71 +++ .../enrichment/parallel/ParallelEnricher.java | 289 +++++++++ .../parallel/WorkerPoolStrategies.java | 45 ++ .../enrichment/utils/EnrichmentUtils.java | 16 + .../enrichment/utils/ThreatIntelUtils.java | 127 ++++ .../main/scripts/start_enrichment_topology.sh | 16 +- .../bolt/BulkMessageWriterBoltTest.java | 25 + .../bolt/GenericEnrichmentBoltTest.java | 2 +- .../metron/enrichment/bolt/JoinBoltTest.java | 7 +- .../integration/EnrichmentIntegrationTest.java | 113 ++-- .../UnifiedEnrichmentIntegrationTest.java | 96 +++ .../parallel/ParallelEnricherTest.java | 251 ++++++++ .../unified_enrichment_arch.svg | 14 + .../unified_enrichment_arch_diagram.xml | 14 + .../org/apache/metron/hbase/bolt/HBaseBolt.java | 22 +- .../metron/hbase/client/UserSettingsClient.java | 175 ++++++ .../hbase/client/UserSettingsClientTest.java | 101 ++++ .../apache/metron/hbase/mock/MockHTable.java | 7 +- metron-platform/metron-indexing/README.md | 24 + .../src/main/flux/indexing/batch/remote.yaml | 4 +- .../flux/indexing/random_access/remote.yaml | 4 +- .../indexing/dao/search/SearchRequest.java | 15 +- .../metron/indexing/dao/search/SortField.java | 15 + .../apache/metron/indexing/dao/InMemoryDao.java | 18 + .../integration/components/KafkaComponent.java | 39 +- .../jsonMapQuery/parsed/jsonMapExampleParsed | 2 + .../data/jsonMapQuery/raw/jsonMapExampleOutput | 1 + metron-platform/metron-management/pom.xml | 1 - .../management/ConfigurationFunctions.java | 564 ++++++++++-------- .../management/ConfigurationFunctionsTest.java | 424 +++++++++---- metron-platform/metron-parsers/README.md | 79 ++- metron-platform/metron-parsers/pom.xml | 5 + .../config/zookeeper/parsers/jsonMapQuery.json | 5 + .../apache/metron/parsers/bolt/ParserBolt.java | 32 +- .../metron/parsers/json/JSONMapParser.java | 145 +++-- .../parsers/topology/ParserTopologyBuilder.java | 4 +- .../metron/parsers/bolt/ParserBoltTest.java | 16 +- .../JSONMapQueryIntegrationTest.java | 36 ++ .../parsers/integration/ParserDriver.java | 6 +- .../validation/SampleDataValidation.java | 2 +- .../parsers/json/JSONMapParserQueryTest.java | 201 +++++++ .../apache/metron/solr/dao/SolrSearchDao.java | 24 +- .../metron/solr/dao/SolrSearchDaoTest.java | 8 +- .../metron/test/utils/ValidationUtils.java | 46 +- .../writer/bolt/BulkMessageWriterBolt.java | 51 +- metron-sensors/pycapa/README.md | 84 +-- metron-stellar/stellar-common/pom.xml | 5 + .../stellar/common/BaseStellarProcessor.java | 31 +- .../stellar/common/CachingStellarProcessor.java | 144 +++++ .../shell/DefaultStellarShellExecutor.java | 34 +- .../shell/specials/AssignmentCommand.java | 2 +- .../stellar/common/utils/ConversionUtils.java | 19 +- .../common/utils/StellarProcessorUtils.java | 135 +++-- .../org/apache/metron/stellar/dsl/Context.java | 43 +- .../stellar/dsl/functions/DateFunctions.java | 8 +- .../resolver/ClasspathFunctionResolver.java | 45 +- .../common/CachingStellarProcessorTest.java | 104 ++++ .../shell/DefaultStellarShellExecutorTest.java | 11 + .../shell/specials/AssignmentCommandTest.java | 14 + .../resolver/ClasspathFunctionResolverTest.java | 30 + metron-stellar/stellar-zeppelin/README.md | 80 +-- metron-stellar/stellar-zeppelin/pom.xml | 12 + .../stellar/zeppelin/StellarInterpreter.java | 95 +-- .../zeppelin/StellarInterpreterProperty.java | 79 +++ .../StellarInterpreterPropertyTest.java | 62 ++ .../zeppelin/StellarInterpreterTest.java | 60 +- .../integration/ConfigUploadComponent.java | 82 +++ .../StellarInterpreterIntegrationTest.java | 104 ++++ pom.xml | 1 + site-book/pom.xml | 4 +- .../src-resources/templates/site.xml.template | 6 +- site/community/index.md | 8 +- 371 files changed, 20030 insertions(+), 7283 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/dependencies_with_url.csv ---------------------------------------------------------------------- diff --cc dependencies_with_url.csv index 2bf1c76,1e73eb1..0b4b16b --- a/dependencies_with_url.csv +++ b/dependencies_with_url.csv @@@ -21,8 -21,10 +21,11 @@@ com.esotericsoftware:reflectasm:jar:1.1 com.flipkart.zjsonpatch:zjsonpatch:jar:0.3.4:compile,Apache v2, https://github.com/flipkart-incubator/zjsonpatch com.google.protobuf:protobuf-java:jar:2.5.0:compile,New BSD license,http://code.google.com/p/protobuf com.google.protobuf:protobuf-java:jar:2.6.1:compile,New BSD license,http://code.google.com/p/protobuf +com.google.protobuf:protobuf-java:jar:3.1.0:compile,New BSD license,http://code.google.com/p/protobuf com.jcraft:jsch:jar:0.1.42:compile,BSD,http://www.jcraft.com/jsch/ + com.jayway.jsonpath:json-path:jar:2.3.0:compile,Apache v2,https://github.com/json-path/JsonPath + net.minidev:accessors-smart:jar:1.2:compile,Apache v2,https://github.com/netplex/json-smart-v2 + net.minidev:json-smart:jar:2.3:compile,Apache v2,https://github.com/netplex/json-smart-v2 com.maxmind.db:maxmind-db:jar:1.2.1:compile,CC-BY-SA 3.0,https://github.com/maxmind/MaxMind-DB com.maxmind.geoip2:geoip2:jar:2.8.0:compile,Apache v2,https://github.com/maxmind/GeoIP2-java com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile,CDDL,http://jaxb.java.net/ http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-deployment/ansible/playbooks/metron_full_install.yml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-deployment/packaging/ambari/metron-mpack/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/metainfo.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py ---------------------------------------------------------------------- diff --cc metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py index e63ea2d,1cd6f4c..b5c4bb9 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py @@@ -166,34 -153,34 +170,62 @@@ class Indexing(Script) cmd.format(params.es_http_url, template_name), logoutput=True) + def solr_schema_install(self, env): + from params import params + env.set_params(params) + Logger.info("Installing Solr schemas") + + commands = IndexingCommands(params) + for collection_name, config_path in commands.get_solr_schemas().iteritems(): + + # install the schema + + cmd = "{0}/bin/solr create -c {1} -d {2}" + Execute( + cmd.format(params.solr_home, collection_name, config_path), + logoutput=True, user="solr") + + def solr_schema_delete(self, env): + from params import params + env.set_params(params) + Logger.info("Deleting Solr schemas") + + commands = IndexingCommands(params) + for collection_name, config_path in commands.get_solr_schemas().iteritems(): + # delete the schema + cmd = "{0}/bin/solr delete -c {1}" + Execute( + cmd.format(params.solr_home, collection_name), + logoutput=True, user="solr") + + @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) + def kibana_dashboard_install(self, env): + from params import params + env.set_params(params) + + Logger.info("Connecting to Elasticsearch on: %s" % (params.es_http_url)) + + kibanaTemplate = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dashboard', 'kibana.template') + if not os.path.isfile(kibanaTemplate): + raise IOError( + errno.ENOENT, os.strerror(errno.ENOENT), kibanaTemplate) + + Logger.info("Loading .kibana index template from %s" % kibanaTemplate) + template_cmd = ambari_format( + 'curl -s -XPOST http://{es_http_url}/_template/.kibana -d @%s' % kibanaTemplate) + Execute(template_cmd, logoutput=True) + + kibanaDashboardLoad = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dashboard', 'dashboard-bulkload.json') + if not os.path.isfile(kibanaDashboardLoad): + raise IOError( + errno.ENOENT, os.strerror(errno.ENOENT), kibanaDashboardLoad) + + Logger.info("Loading .kibana dashboard from %s" % kibanaDashboardLoad) + + kibana_cmd = ambari_format( + 'curl -s -H "Content-Type: application/x-ndjson" -XPOST http://{es_http_url}/.kibana/_bulk --data-binary @%s' % kibanaDashboardLoad) + Execute(kibana_cmd, logoutput=True) + def zeppelin_notebook_import(self, env): from params import params env.set_params(params) http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py ---------------------------------------------------------------------- diff --cc metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py index 5bfa1dc,f44d05f..6f4760b --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py @@@ -84,7 -83,10 +83,11 @@@ indexing_hbase_configured_flag_file = s indexing_hbase_acl_configured_flag_file = status_params.indexing_hbase_acl_configured_flag_file indexing_hdfs_perm_configured_flag_file = status_params.indexing_hdfs_perm_configured_flag_file elasticsearch_template_installed_flag_file = status_params.elasticsearch_template_installed_flag_file +solr_schema_installed_flag_file = status_params.solr_schema_installed_flag_file + rest_kafka_configured_flag_file = status_params.rest_kafka_configured_flag_file + rest_kafka_acl_configured_flag_file = status_params.rest_kafka_acl_configured_flag_file + rest_hbase_configured_flag_file = status_params.rest_hbase_configured_flag_file + rest_hbase_acl_configured_flag_file = status_params.rest_hbase_acl_configured_flag_file global_properties_template = config['configurations']['metron-env']['elasticsearch-properties'] # Elasticsearch hosts and port management http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/status_params.py ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-interface/metron-rest/src/main/scripts/metron-rest.sh ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java ---------------------------------------------------------------------- diff --cc metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java index 5e9ed02,0000000..3971237 mode 100644,000000..100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java @@@ -1,565 -1,0 +1,567 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.dao; + +import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER; + +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; +import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.search.Group; +import org.apache.metron.indexing.dao.search.GroupOrder; +import org.apache.metron.indexing.dao.search.GroupOrderType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; +import org.apache.metron.indexing.dao.search.GroupResult; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchDao; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.search.SearchResult; +import org.apache.metron.indexing.dao.search.SortField; +import org.apache.metron.indexing.dao.search.SortOrder; +import org.apache.metron.indexing.dao.update.Document; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.index.mapper.LegacyIpFieldMapper; +import org.elasticsearch.index.query.IdsQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.QueryStringQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticsearchSearchDao implements SearchDao { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * The value required to ensure that Elasticsearch sorts missing values last. + */ + private static final String SORT_MISSING_LAST = "_last"; + + /** + * The value required to ensure that Elasticsearch sorts missing values last. + */ + private static final String SORT_MISSING_FIRST = "_first"; + + private transient TransportClient client; + private AccessConfig accessConfig; + private ElasticsearchColumnMetadataDao columnMetadataDao; + private ElasticsearchRequestSubmitter requestSubmitter; + + public ElasticsearchSearchDao(TransportClient client, + AccessConfig accessConfig, + ElasticsearchColumnMetadataDao columnMetadataDao, + ElasticsearchRequestSubmitter requestSubmitter) { + this.client = client; + this.accessConfig = accessConfig; + this.columnMetadataDao = columnMetadataDao; + this.requestSubmitter = requestSubmitter; + } + + @Override + public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { + if(searchRequest.getQuery() == null) { + throw new InvalidSearchException("Search query is invalid: null"); + } + return search(searchRequest, new QueryStringQueryBuilder(searchRequest.getQuery())); + } + + @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + return group(groupRequest, new QueryStringQueryBuilder(groupRequest.getQuery())); + } + + @Override + public Document getLatest(String guid, String sensorType) throws IOException { + Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit)); + return doc.orElse(null); + } + + <T> Optional<T> searchByGuid(String guid, String sensorType, + Function<SearchHit, Optional<T>> callback) { + Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null; + List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback); + if (results.size() > 0) { + return Optional.of(results.get(0)); + } else { + return Optional.empty(); + } + } + + @Override + public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException { + Collection<String> guids = new HashSet<>(); + Collection<String> sensorTypes = new HashSet<>(); + for (GetRequest getRequest: getRequests) { + guids.add(getRequest.getGuid()); + sensorTypes.add(getRequest.getSensorType()); + } + List<Document> documents = searchByGuids( + guids + , sensorTypes + , hit -> { + Long ts = 0L; + String doc = hit.getSourceAsString(); + String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null); + try { + return Optional.of(new Document(doc, hit.getId(), sourceType, ts)); + } catch (IOException e) { + throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); + } + } + + ); + return documents; + } + + /** + * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query. + * @param request The request defining the parameters of the search + * @param queryBuilder The actual query to be run. Intended for if the SearchRequest requires wrapping + * @return The results of the query + * @throws InvalidSearchException When the query is malformed or the current state doesn't allow search + */ + protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder) throws InvalidSearchException { + org.elasticsearch.action.search.SearchRequest esRequest; + org.elasticsearch.action.search.SearchResponse esResponse; + + if(client == null) { + throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); + } + + if (request.getSize() > accessConfig.getMaxSearchResults()) { + throw new InvalidSearchException("Search result size must be less than " + accessConfig.getMaxSearchResults()); + } + + esRequest = buildSearchRequest(request, queryBuilder); + esResponse = requestSubmitter.submitSearch(esRequest); + return buildSearchResponse(request, esResponse); + } + + /** + * Builds an Elasticsearch search request. + * @param searchRequest The Metron search request. + * @param queryBuilder + * @return An Elasticsearch search request. + */ + private org.elasticsearch.action.search.SearchRequest buildSearchRequest( + SearchRequest searchRequest, + QueryBuilder queryBuilder) throws InvalidSearchException { + if (LOG.isDebugEnabled()) { + LOG.debug("Got search request; request={}", ElasticsearchUtils.toJSON(searchRequest).orElse("???")); + } + SearchSourceBuilder searchBuilder = new SearchSourceBuilder() + .size(searchRequest.getSize()) + .from(searchRequest.getFrom()) + .query(queryBuilder) + .trackScores(true); - Optional<List<String>> fields = searchRequest.getFields(); ++ List<String> fields = searchRequest.getFields(); + // column metadata needed to understand the type of each sort field + Map<String, FieldType> meta; + try { + meta = columnMetadataDao.getColumnMetadata(searchRequest.getIndices()); + } catch(IOException e) { + throw new InvalidSearchException("Unable to get column metadata", e); + } + + // handle sort fields + for(SortField sortField : searchRequest.getSort()) { + + // what type is the sort field? + FieldType sortFieldType = meta.getOrDefault(sortField.getField(), FieldType.OTHER); + + // sort order - if ascending missing values sorted last. otherwise, missing values sorted first + org.elasticsearch.search.sort.SortOrder sortOrder = getElasticsearchSortOrder(sortField.getSortOrder()); + String missingSortOrder; + if(sortOrder == org.elasticsearch.search.sort.SortOrder.DESC) { + missingSortOrder = SORT_MISSING_LAST; + } else { + missingSortOrder = SORT_MISSING_FIRST; + } + + // sort by the field - missing fields always last + FieldSortBuilder sortBy = new FieldSortBuilder(sortField.getField()) + .order(sortOrder) + .missing(missingSortOrder) + .unmappedType(sortFieldType.getFieldType()); + searchBuilder.sort(sortBy); + } + + // handle search fields - if (fields.isPresent()) { ++ if (fields != null) { + searchBuilder.fetchSource("*", null); + } else { + searchBuilder.fetchSource(true); + } + - Optional<List<String>> facetFields = searchRequest.getFacetFields(); ++ List<String> facetFields = searchRequest.getFacetFields(); + + // handle facet fields - if (searchRequest.getFacetFields().isPresent()) { ++ if (facetFields != null) { + // https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/_bucket_aggregations.html - for(String field : searchRequest.getFacetFields().get()) { ++ for(String field : facetFields) { + String name = getFacetAggregationName(field); + TermsAggregationBuilder terms = AggregationBuilders.terms( name).field(field); + // new TermsBuilder(name).field(field); + searchBuilder.aggregation(terms); + } + } + + // return the search request + String[] indices = wildcardIndices(searchRequest.getIndices()); + if (LOG.isDebugEnabled()) { + LOG.debug("Built Elasticsearch request; indices={}, request={}", indices, searchBuilder.toString()); + } + return new org.elasticsearch.action.search.SearchRequest() + .indices(indices) + .source(searchBuilder); + } + + /** + * Builds a search response. + * + * This effectively transforms an Elasticsearch search response into a Metron search response. + * + * @param searchRequest The Metron search request. + * @param esResponse The Elasticsearch search response. + * @return A Metron search response. + * @throws InvalidSearchException + */ + private SearchResponse buildSearchResponse( + SearchRequest searchRequest, + org.elasticsearch.action.search.SearchResponse esResponse) throws InvalidSearchException { + + SearchResponse searchResponse = new SearchResponse(); + + searchResponse.setTotal(esResponse.getHits().getTotalHits()); + + // search hits --> search results + List<SearchResult> results = new ArrayList<>(); + for(SearchHit hit: esResponse.getHits().getHits()) { + results.add(getSearchResult(hit, searchRequest.getFields())); + } + searchResponse.setResults(results); + + // handle facet fields - if (searchRequest.getFacetFields().isPresent()) { - List<String> facetFields = searchRequest.getFacetFields().get(); ++ if (searchRequest.getFacetFields() != null) { ++ List<String> facetFields = searchRequest.getFacetFields(); + Map<String, FieldType> commonColumnMetadata; + try { + commonColumnMetadata = columnMetadataDao.getColumnMetadata(searchRequest.getIndices()); + } catch (IOException e) { + throw new InvalidSearchException(String.format( + "Could not get common column metadata for indices %s", + Arrays.toString(searchRequest.getIndices().toArray()))); + } + searchResponse.setFacetCounts(getFacetCounts(facetFields, esResponse.getAggregations(), commonColumnMetadata )); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Built search response; response={}", ElasticsearchUtils.toJSON(searchResponse).orElse("???")); + } + return searchResponse; + } + + private org.elasticsearch.search.sort.SortOrder getElasticsearchSortOrder( + org.apache.metron.indexing.dao.search.SortOrder sortOrder) { + return sortOrder == org.apache.metron.indexing.dao.search.SortOrder.DESC ? + org.elasticsearch.search.sort.SortOrder.DESC : org.elasticsearch.search.sort.SortOrder.ASC; + } + + private String getFacetAggregationName(String field) { + return String.format("%s_count", field); + } + + private String[] wildcardIndices(List<String> indices) { + if(indices == null) + return new String[] {}; + + return indices + .stream() + .map(index -> String.format("%s%s*", index, INDEX_NAME_DELIMITER)) + .toArray(value -> new String[indices.size()]); + } + - private SearchResult getSearchResult(SearchHit searchHit, Optional<List<String>> fields) { ++ private SearchResult getSearchResult(SearchHit searchHit, List<String> fields) { + SearchResult searchResult = new SearchResult(); + searchResult.setId(searchHit.getId()); + Map<String, Object> source; - if (fields.isPresent()) { ++ if (fields != null) { + Map<String, Object> resultSourceAsMap = searchHit.getSourceAsMap(); + source = new HashMap<>(); - fields.get().forEach(field -> { ++ fields.forEach(field -> { + source.put(field, resultSourceAsMap.get(field)); + }); + } else { + source = searchHit.getSource(); + } + searchResult.setSource(source); + searchResult.setScore(searchHit.getScore()); + searchResult.setIndex(searchHit.getIndex()); + return searchResult; + } + + private Map<String, Map<String, Long>> getFacetCounts(List<String> fields, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) { + Map<String, Map<String, Long>> fieldCounts = new HashMap<>(); + for (String field: fields) { + Map<String, Long> valueCounts = new HashMap<>(); - Aggregation aggregation = aggregations.get(getFacetAggregationName(field)); - if (aggregation instanceof Terms) { - Terms terms = (Terms) aggregation; - terms.getBuckets().stream().forEach(bucket -> valueCounts.put(formatKey(bucket.getKey(), commonColumnMetadata.get(field)), bucket.getDocCount())); ++ if(aggregations != null ) { ++ Aggregation aggregation = aggregations.get(getFacetAggregationName(field)); ++ if (aggregation instanceof Terms) { ++ Terms terms = (Terms) aggregation; ++ terms.getBuckets().stream().forEach(bucket -> valueCounts.put(formatKey(bucket.getKey(), commonColumnMetadata.get(field)), bucket.getDocCount())); ++ } + } + fieldCounts.put(field, valueCounts); + } + return fieldCounts; + } + + private String formatKey(Object key, FieldType type) { + if (FieldType.IP.equals(type) && key instanceof Long) { + return LegacyIpFieldMapper.longToIp((Long) key); + } else if (FieldType.BOOLEAN.equals(type)) { + return (Long) key == 1 ? "true" : "false"; + } else { + return key.toString(); + } + } + + /** + * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query. + * @param groupRequest The request defining the parameters of the grouping + * @param queryBuilder The actual query to be run. Intended for if the SearchRequest requires wrapping + * @return The results of the query + * @throws InvalidSearchException When the query is malformed or the current state doesn't allow search + */ + protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder) + throws InvalidSearchException { + org.elasticsearch.action.search.SearchRequest esRequest; + org.elasticsearch.action.search.SearchResponse esResponse; + + if (client == null) { + throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); + } + if (groupRequest.getGroups() == null || groupRequest.getGroups().size() == 0) { + throw new InvalidSearchException("At least 1 group must be provided."); + } + + esRequest = buildGroupRequest(groupRequest, queryBuilder); + esResponse = requestSubmitter.submitSearch(esRequest); + GroupResponse response = buildGroupResponse(groupRequest, esResponse); + + return response; + } + + /** + * Builds a group search request. + * @param groupRequest The Metron group request. + * @param queryBuilder The search query. + * @return An Elasticsearch search request. + */ + private org.elasticsearch.action.search.SearchRequest buildGroupRequest( + GroupRequest groupRequest, + QueryBuilder queryBuilder) { + + // handle groups + TermsAggregationBuilder groups = getGroupsTermBuilder(groupRequest, 0); + final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .query(queryBuilder) + .aggregation(groups); + + // return the search request + String[] indices = wildcardIndices(groupRequest.getIndices()); + return new org.elasticsearch.action.search.SearchRequest() + .indices(indices) + .source(searchSourceBuilder); + } + + private TermsAggregationBuilder getGroupsTermBuilder(GroupRequest groupRequest, int index) { + List<Group> groups = groupRequest.getGroups(); + Group group = groups.get(index); + String aggregationName = getGroupByAggregationName(group.getField()); + TermsAggregationBuilder termsBuilder = AggregationBuilders.terms(aggregationName); + termsBuilder + .field(group.getField()) + .size(accessConfig.getMaxSearchGroups()) + .order(getElasticsearchGroupOrder(group.getOrder())); + if (index < groups.size() - 1) { + termsBuilder.subAggregation(getGroupsTermBuilder(groupRequest, index + 1)); + } + Optional<String> scoreField = groupRequest.getScoreField(); + if (scoreField.isPresent()) { + SumAggregationBuilder scoreSumAggregationBuilder = AggregationBuilders.sum(getSumAggregationName(scoreField.get())).field(scoreField.get()).missing(0); + termsBuilder.subAggregation(scoreSumAggregationBuilder); + } + return termsBuilder; + } + + private String getGroupByAggregationName(String field) { + return String.format("%s_group", field); + } + + private String getSumAggregationName(String field) { + return String.format("%s_score", field); + } + + private Order getElasticsearchGroupOrder(GroupOrder groupOrder) { + if (groupOrder.getGroupOrderType() == GroupOrderType.TERM) { + return groupOrder.getSortOrder() == SortOrder.ASC ? Order.term(true) : Order.term(false); + } else { + return groupOrder.getSortOrder() == SortOrder.ASC ? Order.count(true) : Order.count(false); + } + } + + /** + * Build a group response. + * @param groupRequest The original group request. + * @param response The search response. + * @return A group response. + * @throws InvalidSearchException + */ + private GroupResponse buildGroupResponse( + GroupRequest groupRequest, + org.elasticsearch.action.search.SearchResponse response) throws InvalidSearchException { + + // build the search response + Map<String, FieldType> commonColumnMetadata; + try { + commonColumnMetadata = columnMetadataDao.getColumnMetadata(groupRequest.getIndices()); + } catch (IOException e) { + throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s", + Arrays.toString(groupRequest.getIndices().toArray()))); + } + + GroupResponse groupResponse = new GroupResponse(); + groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField()); + groupResponse.setGroupResults(getGroupResults(groupRequest, 0, response.getAggregations(), commonColumnMetadata)); + return groupResponse; + } + + private List<GroupResult> getGroupResults(GroupRequest groupRequest, int index, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) { + List<Group> groups = groupRequest.getGroups(); + String field = groups.get(index).getField(); + Terms terms = aggregations.get(getGroupByAggregationName(field)); + List<GroupResult> searchResultGroups = new ArrayList<>(); + for(Bucket bucket: terms.getBuckets()) { + GroupResult groupResult = new GroupResult(); + groupResult.setKey(formatKey(bucket.getKey(), commonColumnMetadata.get(field))); + groupResult.setTotal(bucket.getDocCount()); + Optional<String> scoreField = groupRequest.getScoreField(); + if (scoreField.isPresent()) { + Sum score = bucket.getAggregations().get(getSumAggregationName(scoreField.get())); + groupResult.setScore(score.getValue()); + } + if (index < groups.size() - 1) { + groupResult.setGroupedBy(groups.get(index + 1).getField()); + groupResult.setGroupResults(getGroupResults(groupRequest, index + 1, bucket.getAggregations(), commonColumnMetadata)); + } + searchResultGroups.add(groupResult); + } + return searchResultGroups; + } + + /** + * Return the search hit based on the UUID and sensor type. + * A callback can be specified to transform the hit into a type T. + * If more than one hit happens, the first one will be returned. + */ + <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes, + Function<SearchHit, Optional<T>> callback) { + if(guids == null || guids.isEmpty()) { + return Collections.EMPTY_LIST; + } + QueryBuilder query = null; + IdsQueryBuilder idsQuery = null; + if (sensorTypes != null) { + String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc").toArray(String[]::new); + idsQuery = QueryBuilders.idsQuery(types); + } else { + idsQuery = QueryBuilders.idsQuery(); + } + + for(String guid : guids) { + query = idsQuery.addIds(guid); + } + + SearchRequestBuilder request = client.prepareSearch() + .setQuery(query) + .setSize(guids.size()) + ; + org.elasticsearch.action.search.SearchResponse response = request.get(); + SearchHits hits = response.getHits(); + List<T> results = new ArrayList<>(); + for (SearchHit hit : hits) { + Optional<T> result = callback.apply(hit); + if (result.isPresent()) { + results.add(result.get()); + } + } + return results; + } + + private Optional<Document> toDocument(final String guid, SearchHit hit) { + Long ts = 0L; + String doc = hit.getSourceAsString(); + String sourceType = toSourceType(hit.getType()); + try { + return Optional.of(new Document(doc, guid, sourceType, ts)); + } catch (IOException e) { + throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); + } + } + + /** + * Returns the source type based on a given doc type. + * @param docType The document type. + * @return The source type. + */ + private String toSourceType(String docType) { + return Iterables.getFirst(Splitter.on("_doc").split(docType), null); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/d0a4e4c0/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrSearchDao.java ---------------------------------------------------------------------- diff --cc metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrSearchDao.java index e336037,0000000..272b96a mode 100644,000000..100644 --- a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrSearchDao.java +++ b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrSearchDao.java @@@ -1,317 -1,0 +1,317 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.solr.dao; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.metron.common.Constants; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.search.Group; +import org.apache.metron.indexing.dao.search.GroupOrder; +import org.apache.metron.indexing.dao.search.GroupOrderType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; +import org.apache.metron.indexing.dao.search.GroupResult; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchDao; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.search.SearchResult; +import org.apache.metron.indexing.dao.search.SortField; +import org.apache.metron.indexing.dao.search.SortOrder; +import org.apache.metron.indexing.dao.update.Document; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrQuery.ORDER; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FacetField.Count; +import org.apache.solr.client.solrj.response.PivotField; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.metron.common.Constants.SENSOR_TYPE; + +public class SolrSearchDao implements SearchDao { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private transient SolrClient client; + private AccessConfig accessConfig; + + public SolrSearchDao(SolrClient client, AccessConfig accessConfig) { + this.client = client; + this.accessConfig = accessConfig; + } + + @Override + public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { + if (searchRequest.getQuery() == null) { + throw new InvalidSearchException("Search query is invalid: null"); + } + if (client == null) { + throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); + } + if (searchRequest.getSize() > accessConfig.getMaxSearchResults()) { + throw new InvalidSearchException( + "Search result size must be less than " + accessConfig.getMaxSearchResults()); + } + try { + SolrQuery query = buildSearchRequest(searchRequest); + QueryResponse response = client.query(query); + return buildSearchResponse(searchRequest, response); + } catch (IOException | SolrServerException e) { + String msg = e.getMessage(); + LOG.error(msg, e); + throw new InvalidSearchException(msg, e); + } + } + + @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + try { + String groupNames = groupRequest.getGroups().stream().map(Group::getField).collect( + Collectors.joining(",")); + SolrQuery query = new SolrQuery() + .setStart(0) + .setRows(0) + .setQuery(groupRequest.getQuery()); + query.set("collection", getCollections(groupRequest.getIndices())); + Optional<String> scoreField = groupRequest.getScoreField(); + if (scoreField.isPresent()) { + query.set("stats", true); + query.set("stats.field", String.format("{!tag=piv1 sum=true}%s", scoreField.get())); + } + query.set("facet", true); + query.set("facet.pivot", String.format("{!stats=piv1}%s", groupNames)); + QueryResponse response = client.query(query); + return buildGroupResponse(groupRequest, response); + } catch (IOException | SolrServerException e) { + String msg = e.getMessage(); + LOG.error(msg, e); + throw new InvalidSearchException(msg, e); + } + } + + @Override + public Document getLatest(String guid, String collection) throws IOException { + try { + SolrDocument solrDocument = client.getById(collection, guid); + return toDocument(solrDocument); + } catch (SolrServerException e) { + throw new IOException(e); + } + } + + @Override + public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException { + Map<String, Collection<String>> collectionIdMap = new HashMap<>(); + for (GetRequest getRequest: getRequests) { + Collection<String> ids = collectionIdMap.getOrDefault(getRequest.getSensorType(), new HashSet<>()); + ids.add(getRequest.getGuid()); + collectionIdMap.put(getRequest.getSensorType(), ids); + } + try { + List<Document> documents = new ArrayList<>(); + for (String collection: collectionIdMap.keySet()) { + SolrDocumentList solrDocumentList = client.getById(collectionIdMap.get(collection), + new SolrQuery().set("collection", collection)); + documents.addAll(solrDocumentList.stream().map(this::toDocument).collect(Collectors.toList())); + } + return documents; + } catch (SolrServerException e) { + throw new IOException(e); + } + } + + protected SolrQuery buildSearchRequest( + SearchRequest searchRequest) throws IOException, SolrServerException { + SolrQuery query = new SolrQuery() + .setStart(searchRequest.getFrom()) + .setRows(searchRequest.getSize()) + .setQuery(searchRequest.getQuery()); + + // handle sort fields + for (SortField sortField : searchRequest.getSort()) { + query.addSort(sortField.getField(), getSolrSortOrder(sortField.getSortOrder())); + } + + // handle search fields - Optional<List<String>> fields = searchRequest.getFields(); - if (fields.isPresent()) { - fields.get().forEach(query::addField); ++ List<String> fields = searchRequest.getFields(); ++ if (fields != null) { ++ fields.forEach(query::addField); + } + + //handle facet fields - Optional<List<String>> facetFields = searchRequest.getFacetFields(); - if (facetFields.isPresent()) { - facetFields.get().forEach(query::addFacetField); ++ List<String> facetFields = searchRequest.getFacetFields(); ++ if (facetFields != null) { ++ facetFields.forEach(query::addFacetField); + } + + query.set("collection", getCollections(searchRequest.getIndices())); + + return query; + } + + private String getCollections(List<String> indices) throws IOException, SolrServerException { + List<String> existingCollections = CollectionAdminRequest.listCollections(client); + return indices.stream().filter(existingCollections::contains).collect(Collectors.joining(",")); + } + + private SolrQuery.ORDER getSolrSortOrder( + SortOrder sortOrder) { + return sortOrder == SortOrder.DESC ? + ORDER.desc : ORDER.asc; + } + + protected SearchResponse buildSearchResponse( + SearchRequest searchRequest, + QueryResponse solrResponse) { + + SearchResponse searchResponse = new SearchResponse(); + SolrDocumentList solrDocumentList = solrResponse.getResults(); + searchResponse.setTotal(solrDocumentList.getNumFound()); + + // search hits --> search results + List<SearchResult> results = solrDocumentList.stream() + .map(solrDocument -> getSearchResult(solrDocument, searchRequest.getFields())) + .collect(Collectors.toList()); + searchResponse.setResults(results); + + // handle facet fields - Optional<List<String>> facetFields = searchRequest.getFacetFields(); - if (facetFields.isPresent()) { - searchResponse.setFacetCounts(getFacetCounts(facetFields.get(), solrResponse)); ++ List<String> facetFields = searchRequest.getFacetFields(); ++ if (facetFields != null) { ++ searchResponse.setFacetCounts(getFacetCounts(facetFields, solrResponse)); + } + + if (LOG.isDebugEnabled()) { + String response; + try { + response = JSONUtils.INSTANCE.toJSON(searchResponse, false); + } catch (JsonProcessingException e) { + response = e.getMessage(); + } + LOG.debug("Built search response; response={}", response); + } + return searchResponse; + } + - protected SearchResult getSearchResult(SolrDocument solrDocument, Optional<List<String>> fields) { ++ protected SearchResult getSearchResult(SolrDocument solrDocument, List<String> fields) { + SearchResult searchResult = new SearchResult(); + searchResult.setId((String) solrDocument.getFieldValue(Constants.GUID)); + final Map<String, Object> source = new HashMap<>(); - if (fields.isPresent()) { - fields.get().forEach(field -> source.put(field, solrDocument.getFieldValue(field))); ++ if (fields != null) { ++ fields.forEach(field -> source.put(field, solrDocument.getFieldValue(field))); + } else { + solrDocument.getFieldNames().forEach(field -> source.put(field, solrDocument.getFieldValue(field))); + } + searchResult.setSource(source); + return searchResult; + } + + protected Map<String, Map<String, Long>> getFacetCounts(List<String> fields, + QueryResponse solrResponse) { + Map<String, Map<String, Long>> fieldCounts = new HashMap<>(); + for (String field : fields) { + Map<String, Long> valueCounts = new HashMap<>(); + FacetField facetField = solrResponse.getFacetField(field); + for (Count facetCount : facetField.getValues()) { + valueCounts.put(facetCount.getName(), facetCount.getCount()); + } + fieldCounts.put(field, valueCounts); + } + return fieldCounts; + } + + /** + * Build a group response. + * @param groupRequest The original group request. + * @param response The search response. + * @return A group response. + */ + protected GroupResponse buildGroupResponse( + GroupRequest groupRequest, + QueryResponse response) { + String groupNames = groupRequest.getGroups().stream().map(Group::getField).collect( + Collectors.joining(",")); + List<PivotField> pivotFields = response.getFacetPivot().get(groupNames); + GroupResponse groupResponse = new GroupResponse(); + groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField()); + groupResponse.setGroupResults(getGroupResults(groupRequest, 0, pivotFields)); + return groupResponse; + } + + protected List<GroupResult> getGroupResults(GroupRequest groupRequest, int index, List<PivotField> pivotFields) { + List<Group> groups = groupRequest.getGroups(); + List<GroupResult> searchResultGroups = new ArrayList<>(); + final GroupOrder groupOrder = groups.get(index).getOrder(); + pivotFields.sort((o1, o2) -> { + String s1 = groupOrder.getGroupOrderType() == GroupOrderType.TERM ? + o1.getValue().toString() : Integer.toString(o1.getCount()); + String s2 = groupOrder.getGroupOrderType() == GroupOrderType.TERM ? + o2.getValue().toString() : Integer.toString(o2.getCount()); + if (groupOrder.getSortOrder() == SortOrder.ASC) { + return s1.compareTo(s2); + } else { + return s2.compareTo(s1); + } + }); + + for(PivotField pivotField: pivotFields) { + GroupResult groupResult = new GroupResult(); + groupResult.setKey(pivotField.getValue().toString()); + groupResult.setTotal(pivotField.getCount()); + Optional<String> scoreField = groupRequest.getScoreField(); + if (scoreField.isPresent()) { + groupResult.setScore((Double) pivotField.getFieldStatsInfo().get("score").getSum()); + } + if (index < groups.size() - 1) { + groupResult.setGroupedBy(groups.get(index + 1).getField()); + groupResult.setGroupResults(getGroupResults(groupRequest, index + 1, pivotField.getPivot())); + } + searchResultGroups.add(groupResult); + } + return searchResultGroups; + } + + protected Document toDocument(SolrDocument solrDocument) { + Map<String, Object> document = new HashMap<>(); + solrDocument.getFieldNames().stream() + .filter(name -> !name.equals(SolrDao.VERSION_FIELD)) + .forEach(name -> document.put(name, solrDocument.getFieldValue(name))); + return new Document(document, + (String) solrDocument.getFieldValue(Constants.GUID), + (String) solrDocument.getFieldValue(SENSOR_TYPE), 0L); + } +}
